HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote NameNode (clamb)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 7e7ff39..ee20e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -78,12 +79,14 @@
.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -104,6 +107,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.fs.CreateFlag.*;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
@@ -1710,4 +1714,39 @@
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
}
+
+ /**
+ * Get the NamenodeProtocol RPC proxy for the NN associated with this
+ * DFSClient object
+ *
+ * @param nameNodeUri the URI of the NN to get a proxy for.
+ *
+ * @return the Namenode RPC proxy associated with this DFSClient object
+ */
+ @VisibleForTesting
+ public static NamenodeProtocol getNamenodeProtocolProxy(Configuration conf,
+ URI nameNodeUri, UserGroupInformation ugi)
+ throws IOException {
+ return NameNodeProxies.createNonHAProxy(conf,
+ NameNode.getAddress(nameNodeUri), NamenodeProtocol.class, ugi, false).
+ getProxy();
+ }
+
+ /**
+ * Get the RefreshUserMappingsProtocol RPC proxy for the NN associated with
+ * this DFSClient object
+ *
+ * @param nameNodeUri the URI of the NN to get a proxy for.
+ *
+ * @return the RefreshUserMappingsProtocol RPC proxy associated with this
+ * DFSClient object
+ */
+ @VisibleForTesting
+ public static RefreshUserMappingsProtocol getRefreshUserMappingsProtocolProxy(
+ Configuration conf, URI nameNodeUri) throws IOException {
+ final AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
+ return NameNodeProxies.createProxy(conf,
+ nameNodeUri, RefreshUserMappingsProtocol.class,
+ nnFallbackToSimpleAuth).getProxy();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index bc3c6b5..6bc1633 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@@ -30,19 +31,24 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -52,6 +58,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
@@ -62,6 +69,8 @@
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
@@ -95,6 +104,9 @@
* By default the refresh is never called.</li>
* <li>-keepResults do not clean up the name-space after execution.</li>
* <li>-useExisting do not recreate the name-space, use existing data.</li>
+ * <li>-namenode will run the test against a namenode in another
+ * process or on another host. If you use this option, the namenode
+ * must have dfs.namenode.fs-limits.min-block-size set to 16.</li>
* </ol>
*
* The benchmark first generates inputs for each thread so that the
@@ -110,11 +122,20 @@
private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class);
private static final int BLOCK_SIZE = 16;
private static final String GENERAL_OPTIONS_USAGE =
- " [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]";
+ " [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G] |" +
+ " [-namenode <namenode URI>]\n" +
+ " If using -namenode, set the namenode's" +
+ " dfs.namenode.fs-limits.min-block-size to 16.";
static Configuration config;
static NameNode nameNode;
- static NamenodeProtocols nameNodeProto;
+ static NamenodeProtocol nameNodeProto;
+ static ClientProtocol clientProto;
+ static DatanodeProtocol dataNodeProto;
+ static RefreshUserMappingsProtocol refreshUserMappingsProto;
+ static String bpid = null;
+
+ private String namenodeUri = null; // NN URI to use, if specified
NNThroughputBenchmark(Configuration conf) throws IOException {
config = conf;
@@ -263,7 +284,7 @@
for(StatsDaemon d : daemons)
d.start();
} finally {
- while(isInPorgress()) {
+ while(isInProgress()) {
// try {Thread.sleep(500);} catch (InterruptedException e) {}
}
elapsedTime = Time.now() - start;
@@ -274,7 +295,7 @@
}
}
- private boolean isInPorgress() {
+ private boolean isInProgress() {
for(StatsDaemon d : daemons)
if(d.isInProgress())
return true;
@@ -282,10 +303,10 @@
}
void cleanUp() throws IOException {
- nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+ clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
if(!keepResults)
- nameNodeProto.delete(getBaseDir(), true);
+ clientProto.delete(getBaseDir(), true);
}
int getNumOpsExecuted() {
@@ -359,6 +380,12 @@
args.remove(ugrcIndex);
}
+ try {
+ namenodeUri = StringUtils.popOptionWithArgument("-namenode", args);
+ } catch (IllegalArgumentException iae) {
+ printUsage();
+ }
+
String type = args.get(1);
if(OP_ALL_NAME.equals(type)) {
type = getOpName();
@@ -417,7 +444,7 @@
void benchmarkOne() throws IOException {
for(int idx = 0; idx < opsPerThread; idx++) {
if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
- nameNodeProto.refreshUserToGroupsMappings();
+ refreshUserMappingsProto.refreshUserToGroupsMappings();
long stat = statsOp.executeOp(daemonId, idx, arg1);
localNumOpsExecuted++;
localCumulativeTime += stat;
@@ -483,10 +510,10 @@
@Override
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
- nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+ clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
long start = Time.now();
- nameNodeProto.delete(BASE_DIR_NAME, true);
+ clientProto.delete(BASE_DIR_NAME, true);
long end = Time.now();
return end-start;
}
@@ -552,7 +579,7 @@
@Override
void generateInputs(int[] opsPerThread) throws IOException {
assert opsPerThread.length == numThreads : "Error opsPerThread.length";
- nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+ clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
// int generatedFileIdx = 0;
LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
@@ -587,13 +614,13 @@
throws IOException {
long start = Time.now();
// dummyActionNoSynch(fileIdx);
- nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
+ clientProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
clientName, new EnumSetWritable<CreateFlag>(EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true,
- replication, BLOCK_SIZE, null);
+ replication, BLOCK_SIZE, CryptoProtocolVersion.supported());
long end = Time.now();
for(boolean written = !closeUponCreate; !written;
- written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
+ written = clientProto.complete(fileNames[daemonId][inputIdx],
clientName, null, INodeId.GRANDFATHER_INODE_ID));
return end-start;
}
@@ -656,7 +683,7 @@
@Override
void generateInputs(int[] opsPerThread) throws IOException {
assert opsPerThread.length == numThreads : "Error opsPerThread.length";
- nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+ clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName());
dirPaths = new String[numThreads][];
@@ -684,7 +711,7 @@
long executeOp(int daemonId, int inputIdx, String clientName)
throws IOException {
long start = Time.now();
- nameNodeProto.mkdirs(dirPaths[daemonId][inputIdx],
+ clientProto.mkdirs(dirPaths[daemonId][inputIdx],
FsPermission.getDefault(), true);
long end = Time.now();
return end-start;
@@ -756,11 +783,11 @@
}
// use the same files for open
super.generateInputs(opsPerThread);
- if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null
- && nameNodeProto.getFileInfo(getBaseDir()) == null) {
- nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir());
+ if(clientProto.getFileInfo(opCreate.getBaseDir()) != null
+ && clientProto.getFileInfo(getBaseDir()) == null) {
+ clientProto.rename(opCreate.getBaseDir(), getBaseDir());
}
- if(nameNodeProto.getFileInfo(getBaseDir()) == null) {
+ if(clientProto.getFileInfo(getBaseDir()) == null) {
throw new IOException(getBaseDir() + " does not exist.");
}
}
@@ -772,7 +799,7 @@
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
long start = Time.now();
- nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
+ clientProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
long end = Time.now();
return end-start;
}
@@ -802,7 +829,7 @@
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
long start = Time.now();
- nameNodeProto.delete(fileNames[daemonId][inputIdx], false);
+ clientProto.delete(fileNames[daemonId][inputIdx], false);
long end = Time.now();
return end-start;
}
@@ -832,7 +859,7 @@
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
long start = Time.now();
- nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]);
+ clientProto.getFileInfo(fileNames[daemonId][inputIdx]);
long end = Time.now();
return end-start;
}
@@ -876,7 +903,7 @@
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
long start = Time.now();
- nameNodeProto.rename(fileNames[daemonId][inputIdx],
+ clientProto.rename(fileNames[daemonId][inputIdx],
destNames[daemonId][inputIdx]);
long end = Time.now();
return end-start;
@@ -932,14 +959,13 @@
new DataStorage(nsInfo),
new ExportedBlockKeys(), VersionInfo.getVersion());
// register datanode
- dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
+ dnRegistration = dataNodeProto.registerDatanode(dnRegistration);
//first block reports
storage = new DatanodeStorage(DatanodeStorage.generateUuid());
final StorageBlockReport[] reports = {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
};
- nameNodeProto.blockReport(dnRegistration,
- nameNode.getNamesystem().getBlockPoolId(), reports);
+ dataNodeProto.blockReport(dnRegistration, bpid, reports);
}
/**
@@ -951,7 +977,7 @@
// TODO:FEDERATION currently a single block pool is supported
StorageReport[] rep = { new StorageReport(storage, false,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
- DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
+ DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
@@ -1000,7 +1026,7 @@
// register datanode
StorageReport[] rep = { new StorageReport(storage,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
- DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
+ DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
@@ -1039,8 +1065,7 @@
null) };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
targetStorageID, rdBlocks) };
- nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
- .getNamesystem().getBlockPoolId(), report);
+ dataNodeProto.blockReceivedAndDeleted(receivedDNReg, bpid, report);
}
}
return blocks.length;
@@ -1131,15 +1156,15 @@
FileNameGenerator nameGenerator;
nameGenerator = new FileNameGenerator(getBaseDir(), 100);
String clientName = getClientName(007);
- nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+ clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
for(int idx=0; idx < nrFiles; idx++) {
String fileName = nameGenerator.getNextFileName("ThroughputBench");
- nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
+ clientProto.create(fileName, FsPermission.getDefault(), clientName,
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
- BLOCK_SIZE, null);
+ BLOCK_SIZE, CryptoProtocolVersion.supported());
ExtendedBlock lastBlock = addBlocks(fileName, clientName);
- nameNodeProto.complete(fileName, clientName, lastBlock, INodeId.GRANDFATHER_INODE_ID);
+ clientProto.complete(fileName, clientName, lastBlock, INodeId.GRANDFATHER_INODE_ID);
}
// prepare block reports
for(int idx=0; idx < nrDatanodes; idx++) {
@@ -1151,7 +1176,7 @@
throws IOException {
ExtendedBlock prevBlock = null;
for(int jdx = 0; jdx < blocksPerFile; jdx++) {
- LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
+ LocatedBlock loc = clientProto.addBlock(fileName, clientName,
prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null);
prevBlock = loc.getBlock();
for(DatanodeInfo dnInfo : loc.getLocations()) {
@@ -1162,8 +1187,8 @@
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
- nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
- .getBlock().getBlockPoolId(), report);
+ dataNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration,
+ bpid, report);
}
}
return prevBlock;
@@ -1184,8 +1209,7 @@
long start = Time.now();
StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) };
- nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
- .getBlockPoolId(), report);
+ dataNodeProto.blockReport(dn.dnRegistration, bpid, report);
long end = Time.now();
return end-start;
}
@@ -1315,7 +1339,7 @@
LOG.info("Datanode " + dn + " is decommissioned.");
}
excludeFile.close();
- nameNodeProto.refreshNodes();
+ clientProto.refreshNodes();
}
/**
@@ -1411,8 +1435,6 @@
// Start the NameNode
String[] argv = new String[] {};
- nameNode = NameNode.createNameNode(argv, config);
- nameNodeProto = nameNode.getRpcServer();
List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
OperationStatsBase opStat = null;
@@ -1453,6 +1475,29 @@
opStat = new CleanAllStats(args);
ops.add(opStat);
}
+
+ if (namenodeUri == null) {
+ nameNode = NameNode.createNameNode(argv, config);
+ NamenodeProtocols nnProtos = nameNode.getRpcServer();
+ nameNodeProto = nnProtos;
+ clientProto = nnProtos;
+ dataNodeProto = nnProtos;
+ refreshUserMappingsProto = nnProtos;
+ bpid = nameNode.getNamesystem().getBlockPoolId();
+ } else {
+ FileSystem.setDefaultUri(getConf(), namenodeUri);
+ DistributedFileSystem dfs = (DistributedFileSystem)
+ FileSystem.get(getConf());
+ final URI nnUri = new URI(namenodeUri);
+ nameNodeProto = DFSTestUtil.getNamenodeProtocolProxy(config, nnUri,
+ UserGroupInformation.getCurrentUser());
+ clientProto = dfs.getClient().getNamenode();
+ dataNodeProto = new DatanodeProtocolClientSideTranslatorPB(
+ NameNode.getAddress(nnUri), config);
+ refreshUserMappingsProto =
+ DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnUri);
+ getBlockPoolId(dfs);
+ }
if(ops.size() == 0)
printUsage();
// run each benchmark
@@ -1473,6 +1518,12 @@
return 0;
}
+ private void getBlockPoolId(DistributedFileSystem unused)
+ throws IOException {
+ final NamespaceInfo nsInfo = nameNodeProto.versionRequest();
+ bpid = nsInfo.getBlockPoolID();
+ }
+
public static void main(String[] args) throws Exception {
NNThroughputBenchmark bench = null;
try {