HDFS-2020. Fix TestDFSUpgradeFromImage by removing the use of DataNode as a singleton. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1130368 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 891afb9..e22e194 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -659,6 +659,9 @@
HDFS-2021. Update numBytesAcked before sending the ack in PacketResponder.
(John George via szetszwo)
+ HDFS-2020. Fix TestDFSUpgradeFromImage by removing the use of DataNode
+ as a singleton. (suresh via todd)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index 51ed956..304a64b 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -63,10 +63,6 @@
private String blockpoolID = ""; // id of the blockpool
- BlockPoolSliceStorage() {
- super(NodeType.DATA_NODE);
- }
-
public BlockPoolSliceStorage(StorageInfo storageInfo, String bpid) {
super(NodeType.DATA_NODE, storageInfo);
blockpoolID = bpid;
@@ -84,13 +80,14 @@
/**
* Analyze storage directories. Recover from previous transitions if required.
*
+ * @param datanode Datanode to which this storage belongs to
* @param nsInfo namespace information
* @param dataDirs storage directories of block pool
* @param startOpt startup option
* @throws IOException on error
*/
- void recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs,
- StartupOption startOpt) throws IOException {
+ void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
+ Collection<File> dataDirs, StartupOption startOpt) throws IOException {
assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion()
: "Block-pool and name-node layout versions must be the same.";
@@ -140,7 +137,7 @@
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- doTransition(getStorageDir(idx), nsInfo, startOpt);
+ doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
assert getLayoutVersion() == nsInfo.getLayoutVersion()
: "Data-node and name-node layout versions must be the same.";
assert getCTime() == nsInfo.getCTime()
@@ -228,12 +225,13 @@
* Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime Regular
* startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
*
- * @param sd storage directory,
+ * @param dn DataNode to which this storage belongs to
+ * @param sd storage directory <SD>/current/<bpid>
* @param nsInfo namespace info
* @param startOpt startup option
* @throws IOException
*/
- private void doTransition(StorageDirectory sd, // i.e. <SD>/current/<bpid>
+ private void doTransition(DataNode datanode, StorageDirectory sd,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK)
doRollback(sd, nsInfo); // rollback if applicable
@@ -259,7 +257,9 @@
return; // regular startup
// verify necessity of a distributed upgrade
- verifyDistributedUpgradeProgress(nsInfo);
+ UpgradeManagerDatanode um =
+ datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
+ verifyDistributedUpgradeProgress(um, nsInfo);
if (this.layoutVersion > FSConstants.LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo); // upgrade
@@ -474,10 +474,8 @@
LOG.info( hardLink.linkStats.report() );
}
- private void verifyDistributedUpgradeProgress(NamespaceInfo nsInfo)
- throws IOException {
- UpgradeManagerDatanode um =
- DataNode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
+ private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
+ NamespaceInfo nsInfo) throws IOException {
assert um != null : "DataNode.upgradeManager is null.";
um.setUpgradeState(false, getLayoutVersion());
um.initializeUpgrade(nsInfo);
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
index 9504867..535619e 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
@@ -101,7 +100,7 @@
private void waitForInit(String bpid) {
UpgradeManagerDatanode um = null;
if(bpid != null && !bpid.equals(""))
- um = DataNode.getUpgradeManagerDatanode(bpid);
+ um = datanode.getUpgradeManagerDatanode(bpid);
while ((um != null && ! um.isUpgradeCompleted())
|| (getBlockPoolSetSize() < datanode.getAllBpOs().length)
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bbcdf89..c73bc05 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -37,7 +37,6 @@
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.AbstractList;
@@ -353,7 +352,6 @@
DataNodeMetrics metrics;
private InetSocketAddress selfAddr;
- private static volatile DataNode datanodeObject = null;
private volatile String hostName; // Host name of this datanode
private static String dnThreadName;
@@ -398,8 +396,6 @@
final SecureResources resources) throws IOException {
super(conf);
- DataNode.setDataNode(this);
-
try {
hostName = getHostName(conf);
startDataNode(conf, dataDirs, resources);
@@ -590,7 +586,7 @@
reason = "verification is supported only with FSDataset";
}
if (reason == null) {
- directoryScanner = new DirectoryScanner((FSDataset) data, conf);
+ directoryScanner = new DirectoryScanner(this, (FSDataset) data, conf);
directoryScanner.start();
} else {
LOG.info("Periodic Directory Tree Verification scan is disabled because " +
@@ -793,7 +789,8 @@
bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
} else {
// read storage info, lock data dirs and transition fs state if necessary
- storage.recoverTransitionRead(blockPoolId, bpNSInfo, dataDirs, startOpt);
+ storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo,
+ dataDirs, startOpt);
LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid="
+ blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
+ bpNSInfo);
@@ -1324,7 +1321,7 @@
synchronized UpgradeManagerDatanode getUpgradeManager() {
if(upgradeManager == null)
upgradeManager =
- new UpgradeManagerDatanode(DataNode.getDataNode(), blockPoolId);
+ new UpgradeManagerDatanode(DataNode.this, blockPoolId);
return upgradeManager;
}
@@ -1403,7 +1400,7 @@
conf.getBoolean("dfs.datanode.simulateddatastorage", false);
if (simulatedFSDataset) {
- storage.createStorageID();
+ storage.createStorageID(getPort());
// it would have been better to pass storage as a parameter to
// constructor below - need to augment ReflectionUtils used below.
conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
@@ -1416,7 +1413,7 @@
throw new IOException(StringUtils.stringifyException(e));
}
} else {
- data = new FSDataset(storage, conf);
+ data = new FSDataset(this, storage, conf);
}
}
@@ -1491,15 +1488,6 @@
SocketChannel.open().socket() : new Socket();
}
- private static void setDataNode(DataNode node) {
- datanodeObject = node;
- }
-
- /** Return the DataNode object */
- public static DataNode getDataNode() {
- return datanodeObject;
- }
-
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
throws IOException {
@@ -1551,11 +1539,7 @@
dnId.storageID = createNewStorageId(dnId.getPort());
}
- static String createNewStorageId() {
- return createNewStorageId(datanodeObject.getPort());
- }
-
- private static String createNewStorageId(int port) {
+ static String createNewStorageId(int port) {
/* Return
* "DS-randInt-ipaddr-currentTimeMillis"
* It is considered extermely rare for all these numbers to match
@@ -1728,9 +1712,8 @@
return threadGroup == null ? 0 : threadGroup.activeCount();
}
- static UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
- DataNode dn = getDataNode();
- BPOfferService bpos = dn.blockPoolManager.get(bpid);
+ UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
+ BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos==null) {
return null;
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 6af702f..42f6988 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -109,11 +109,11 @@
this.storageID = newStorageID;
}
- synchronized void createStorageID() {
+ synchronized void createStorageID(int datanodePort) {
if (storageID != null && !storageID.isEmpty()) {
return;
}
- storageID = DataNode.createNewStorageId();
+ storageID = DataNode.createNewStorageId(datanodePort);
}
/**
@@ -130,10 +130,9 @@
* @param startOpt startup option
* @throws IOException
*/
- synchronized void recoverTransitionRead(NamespaceInfo nsInfo,
- Collection<File> dataDirs,
- StartupOption startOpt
- ) throws IOException {
+ synchronized void recoverTransitionRead(DataNode datanode,
+ NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)
+ throws IOException {
if (initilized) {
// DN storage has been initialized, no need to do anything
return;
@@ -192,13 +191,13 @@
// During startup some of them can upgrade or rollback
// while others could be uptodate for the regular startup.
for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- doTransition(getStorageDir(idx), nsInfo, startOpt);
+ doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
"Data-node and name-node layout versions must be the same.";
}
// make sure we have storage id set - if not - generate new one
- createStorageID();
+ createStorageID(datanode.getPort());
// 3. Update all storages. Some of them might have just been formatted.
this.writeAll();
@@ -210,16 +209,17 @@
/**
* recoverTransitionRead for a specific block pool
*
+ * @param datanode DataNode
* @param bpID Block pool Id
* @param nsInfo Namespace info of namenode corresponding to the block pool
* @param dataDirs Storage directories
* @param startOpt startup option
* @throws IOException on error
*/
- void recoverTransitionRead(String bpID, NamespaceInfo nsInfo,
+ void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
// First ensure datanode level format/snapshot/rollback is completed
- recoverTransitionRead(nsInfo, dataDirs, startOpt);
+ recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);
// Create list of storage directories for the block pool
Collection<File> bpDataDirs = new ArrayList<File>();
@@ -234,7 +234,7 @@
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
- bpStorage.recoverTransitionRead(nsInfo, bpDataDirs, startOpt);
+ bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
addBlockPoolStorage(bpID, bpStorage);
}
@@ -358,12 +358,14 @@
* Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
* Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
*
+ * @param datanode Datanode to which this storage belongs to
* @param sd storage directory
* @param nsInfo namespace info
* @param startOpt startup option
* @throws IOException
*/
- private void doTransition( StorageDirectory sd,
+ private void doTransition( DataNode datanode,
+ StorageDirectory sd,
NamespaceInfo nsInfo,
StartupOption startOpt
) throws IOException {
@@ -399,7 +401,10 @@
&& this.cTime == nsInfo.getCTime())
return; // regular startup
// verify necessity of a distributed upgrade
- verifyDistributedUpgradeProgress(nsInfo);
+ UpgradeManagerDatanode um =
+ datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
+ verifyDistributedUpgradeProgress(um, nsInfo);
+
// do upgrade
if (this.layoutVersion > FSConstants.LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
@@ -729,11 +734,9 @@
}
}
- private void verifyDistributedUpgradeProgress(
+ private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
NamespaceInfo nsInfo
) throws IOException {
- UpgradeManagerDatanode um =
- DataNode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
assert um != null : "DataNode.upgradeManager is null.";
um.setUpgradeState(false, getLayoutVersion());
um.initializeUpgrade(nsInfo);
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 9b38a5f..68eac2c 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -57,6 +57,7 @@
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private static final int DEFAULT_SCAN_INTERVAL = 21600;
+ private final DataNode datanode;
private final FSDataset dataset;
private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread;
@@ -221,7 +222,8 @@
}
}
- DirectoryScanner(FSDataset dataset, Configuration conf) {
+ DirectoryScanner(DataNode dn, FSDataset dataset, Configuration conf) {
+ this.datanode = dn;
this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DEFAULT_SCAN_INTERVAL);
@@ -273,7 +275,7 @@
String[] bpids = dataset.getBPIdlist();
for(String bpid : bpids) {
UpgradeManagerDatanode um =
- DataNode.getUpgradeManagerDatanode(bpid);
+ datanode.getUpgradeManagerDatanode(bpid);
if (um != null && !um.isUpgradeCompleted()) {
//If distributed upgrades underway, exit and wait for next cycle.
LOG.warn("this cycle terminating immediately because Distributed Upgrade is in process");
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
index 41b4797..9faa486 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
@@ -1118,6 +1118,7 @@
return f;
}
+ private final DataNode datanode;
final FSVolumeSet volumes;
private final int maxBlocksPerDir;
final ReplicasMap volumeMap;
@@ -1133,7 +1134,9 @@
/**
* An FSDataset has a directory where it loads its data files.
*/
- public FSDataset(DataStorage storage, Configuration conf) throws IOException {
+ public FSDataset(DataNode datanode, DataStorage storage, Configuration conf)
+ throws IOException {
+ this.datanode = datanode;
this.maxBlocksPerDir =
conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
@@ -2000,7 +2003,6 @@
return f;
// if file is not null, but doesn't exist - possibly disk failed
- DataNode datanode = DataNode.getDataNode();
datanode.checkDiskError();
}
@@ -2246,7 +2248,6 @@
*/
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FSVolume vol) {
- DataNode datanode = DataNode.getDataNode();
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
synchronized (this) {
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
index b6df950..091cdaa 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
@@ -125,7 +125,7 @@
// Complete the upgrade by calling the manager method
try {
UpgradeManagerDatanode upgradeManager =
- DataNode.getUpgradeManagerDatanode(bpid);
+ dataNode.getUpgradeManagerDatanode(bpid);
if(upgradeManager != null)
upgradeManager.completeUpgrade();
} catch(IOException e) {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
index ae7caae..10fb1e4 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
@@ -89,8 +89,10 @@
final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
xml.declaration();
+ final ServletContext context = getServletContext();
+ final DataNode datanode = (DataNode) context.getAttribute("datanode");
final Configuration conf =
- new HdfsConfiguration(DataNode.getDataNode().getConf());
+ new HdfsConfiguration(datanode.getConf());
final int socketTimeout = conf.getInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
@@ -99,7 +101,7 @@
try {
final DFSClient dfs = DatanodeJspHelper.getDFSClient(request,
- DataNode.getDataNode(), conf, getUGI(request, conf));
+ datanode, conf, getUGI(request, conf));
final ClientProtocol nnproxy = dfs.getNamenode();
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
filename, nnproxy, socketFactory, socketTimeout);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java b/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
index 0049281..f447ea7 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
@@ -20,10 +20,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
-import java.net.InetSocketAddress;
import java.util.Enumeration;
import java.util.List;
+import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -46,14 +46,14 @@
public static final String CONTENT_LENGTH = "Content-Length";
- static DataNode datanode = DataNode.getDataNode();
-
/** getting a client for connecting to dfs */
protected DFSClient getDFSClient(HttpServletRequest request)
throws IOException, InterruptedException {
final Configuration conf =
(Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
UserGroupInformation ugi = getUGI(request, conf);
+ final ServletContext context = getServletContext();
+ final DataNode datanode = (DataNode) context.getAttribute("datanode");
return DatanodeJspHelper.getDFSClient(request, datanode, conf, ugi);
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index a783f92..85eb8fe 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -231,7 +231,8 @@
fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
- scanner = new DirectoryScanner(fds, CONF);
+ DataNode dn = cluster.getDataNodes().get(0);
+ scanner = new DirectoryScanner(dn, fds, CONF);
scanner.setRetainDiffs(true);
// Add files with 100 blocks