HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote NameNode (clamb)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 7e7ff39..ee20e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -78,12 +79,14 @@
         .ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -104,6 +107,7 @@
 import java.security.PrivilegedExceptionAction;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.fs.CreateFlag.*;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
@@ -1710,4 +1714,39 @@
     GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
     GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
   }
+
+  /**
+   * Get the NamenodeProtocol RPC proxy for the NN associated with this
+   * DFSClient object
+   *
+   * @param nameNodeUri the URI of the NN to get a proxy for.
+   *
+   * @return the Namenode RPC proxy associated with this DFSClient object
+   */
+  @VisibleForTesting
+  public static NamenodeProtocol getNamenodeProtocolProxy(Configuration conf,
+      URI nameNodeUri, UserGroupInformation ugi)
+      throws IOException {
+    return NameNodeProxies.createNonHAProxy(conf,
+        NameNode.getAddress(nameNodeUri), NamenodeProtocol.class, ugi, false).
+        getProxy();
+  }
+
+  /**
+   * Get the RefreshUserMappingsProtocol RPC proxy for the NN associated with
+   * this DFSClient object
+   *
+   * @param nameNodeUri the URI of the NN to get a proxy for.
+   *
+   * @return the RefreshUserMappingsProtocol RPC proxy associated with this
+   * DFSClient object
+   */
+  @VisibleForTesting
+  public static RefreshUserMappingsProtocol getRefreshUserMappingsProtocolProxy(
+      Configuration conf, URI nameNodeUri) throws IOException {
+    final AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
+    return NameNodeProxies.createProxy(conf,
+        nameNodeUri, RefreshUserMappingsProtocol.class,
+        nnFallbackToSimpleAuth).getProxy();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index bc3c6b5..6bc1633 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -20,6 +20,7 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -30,19 +31,24 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -52,6 +58,7 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
@@ -62,6 +69,8 @@
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -95,6 +104,9 @@
  * By default the refresh is never called.</li>
  * <li>-keepResults do not clean up the name-space after execution.</li>
  * <li>-useExisting do not recreate the name-space, use existing data.</li>
+ * <li>-namenode will run the test against a namenode in another
+ * process or on another host. If you use this option, the namenode
+ * must have dfs.namenode.fs-limits.min-block-size set to 16.</li>
  * </ol>
  * 
  * The benchmark first generates inputs for each thread so that the
@@ -110,11 +122,20 @@
   private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class);
   private static final int BLOCK_SIZE = 16;
   private static final String GENERAL_OPTIONS_USAGE = 
-    "     [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]";
+    "     [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G] |" +
+    " [-namenode <namenode URI>]\n" +
+    "     If using -namenode, set the namenode's" +
+    "         dfs.namenode.fs-limits.min-block-size to 16.";
 
   static Configuration config;
   static NameNode nameNode;
-  static NamenodeProtocols nameNodeProto;
+  static NamenodeProtocol nameNodeProto;
+  static ClientProtocol clientProto;
+  static DatanodeProtocol dataNodeProto;
+  static RefreshUserMappingsProtocol refreshUserMappingsProto;
+  static String bpid = null;
+
+  private String namenodeUri = null; // NN URI to use, if specified
 
   NNThroughputBenchmark(Configuration conf) throws IOException {
     config = conf;
@@ -263,7 +284,7 @@
         for(StatsDaemon d : daemons)
           d.start();
       } finally {
-        while(isInPorgress()) {
+        while(isInProgress()) {
           // try {Thread.sleep(500);} catch (InterruptedException e) {}
         }
         elapsedTime = Time.now() - start;
@@ -274,7 +295,7 @@
       }
     }
 
-    private boolean isInPorgress() {
+    private boolean isInProgress() {
       for(StatsDaemon d : daemons)
         if(d.isInProgress())
           return true;
@@ -282,10 +303,10 @@
     }
 
     void cleanUp() throws IOException {
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       if(!keepResults)
-        nameNodeProto.delete(getBaseDir(), true);
+        clientProto.delete(getBaseDir(), true);
     }
 
     int getNumOpsExecuted() {
@@ -359,6 +380,12 @@
         args.remove(ugrcIndex);
       }
 
+      try {
+        namenodeUri = StringUtils.popOptionWithArgument("-namenode", args);
+      } catch (IllegalArgumentException iae) {
+        printUsage();
+      }
+
       String type = args.get(1);
       if(OP_ALL_NAME.equals(type)) {
         type = getOpName();
@@ -417,7 +444,7 @@
     void benchmarkOne() throws IOException {
       for(int idx = 0; idx < opsPerThread; idx++) {
         if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
-          nameNodeProto.refreshUserToGroupsMappings();
+          refreshUserMappingsProto.refreshUserToGroupsMappings();
         long stat = statsOp.executeOp(daemonId, idx, arg1);
         localNumOpsExecuted++;
         localCumulativeTime += stat;
@@ -483,10 +510,10 @@
     @Override
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       long start = Time.now();
-      nameNodeProto.delete(BASE_DIR_NAME, true);
+      clientProto.delete(BASE_DIR_NAME, true);
       long end = Time.now();
       return end-start;
     }
@@ -552,7 +579,7 @@
     @Override
     void generateInputs(int[] opsPerThread) throws IOException {
       assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       // int generatedFileIdx = 0;
       LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
@@ -587,13 +614,13 @@
     throws IOException {
       long start = Time.now();
       // dummyActionNoSynch(fileIdx);
-      nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
+      clientProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
                       clientName, new EnumSetWritable<CreateFlag>(EnumSet
               .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, 
-          replication, BLOCK_SIZE, null);
+          replication, BLOCK_SIZE, CryptoProtocolVersion.supported());
       long end = Time.now();
       for(boolean written = !closeUponCreate; !written; 
-        written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
+        written = clientProto.complete(fileNames[daemonId][inputIdx],
                                     clientName, null, INodeId.GRANDFATHER_INODE_ID));
       return end-start;
     }
@@ -656,7 +683,7 @@
     @Override
     void generateInputs(int[] opsPerThread) throws IOException {
       assert opsPerThread.length == numThreads : "Error opsPerThread.length";
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName());
       dirPaths = new String[numThreads][];
@@ -684,7 +711,7 @@
     long executeOp(int daemonId, int inputIdx, String clientName)
         throws IOException {
       long start = Time.now();
-      nameNodeProto.mkdirs(dirPaths[daemonId][inputIdx],
+      clientProto.mkdirs(dirPaths[daemonId][inputIdx],
           FsPermission.getDefault(), true);
       long end = Time.now();
       return end-start;
@@ -756,11 +783,11 @@
       }
       // use the same files for open
       super.generateInputs(opsPerThread);
-      if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null
-          && nameNodeProto.getFileInfo(getBaseDir()) == null) {
-        nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir());
+      if(clientProto.getFileInfo(opCreate.getBaseDir()) != null
+          && clientProto.getFileInfo(getBaseDir()) == null) {
+        clientProto.rename(opCreate.getBaseDir(), getBaseDir());
       }
-      if(nameNodeProto.getFileInfo(getBaseDir()) == null) {
+      if(clientProto.getFileInfo(getBaseDir()) == null) {
         throw new IOException(getBaseDir() + " does not exist.");
       }
     }
@@ -772,7 +799,7 @@
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
+      clientProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
       long end = Time.now();
       return end-start;
     }
@@ -802,7 +829,7 @@
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.delete(fileNames[daemonId][inputIdx], false);
+      clientProto.delete(fileNames[daemonId][inputIdx], false);
       long end = Time.now();
       return end-start;
     }
@@ -832,7 +859,7 @@
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]);
+      clientProto.getFileInfo(fileNames[daemonId][inputIdx]);
       long end = Time.now();
       return end-start;
     }
@@ -876,7 +903,7 @@
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.rename(fileNames[daemonId][inputIdx],
+      clientProto.rename(fileNames[daemonId][inputIdx],
                       destNames[daemonId][inputIdx]);
       long end = Time.now();
       return end-start;
@@ -932,14 +959,13 @@
           new DataStorage(nsInfo),
           new ExportedBlockKeys(), VersionInfo.getVersion());
       // register datanode
-      dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
+      dnRegistration = dataNodeProto.registerDatanode(dnRegistration);
       //first block reports
       storage = new DatanodeStorage(DatanodeStorage.generateUuid());
       final StorageBlockReport[] reports = {
           new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
-      nameNodeProto.blockReport(dnRegistration, 
-          nameNode.getNamesystem().getBlockPoolId(), reports);
+      dataNodeProto.blockReport(dnRegistration, bpid, reports);
     }
 
     /**
@@ -951,7 +977,7 @@
       // TODO:FEDERATION currently a single block pool is supported
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
-      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
+      DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
@@ -1000,7 +1026,7 @@
       // register datanode
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
-      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
+      DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
@@ -1039,8 +1065,7 @@
                   null) };
           StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
               targetStorageID, rdBlocks) };
-          nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
-              .getNamesystem().getBlockPoolId(), report);
+          dataNodeProto.blockReceivedAndDeleted(receivedDNReg, bpid, report);
         }
       }
       return blocks.length;
@@ -1131,15 +1156,15 @@
       FileNameGenerator nameGenerator;
       nameGenerator = new FileNameGenerator(getBaseDir(), 100);
       String clientName = getClientName(007);
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
-        nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
+        clientProto.create(fileName, FsPermission.getDefault(), clientName,
             new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
-            BLOCK_SIZE, null);
+            BLOCK_SIZE, CryptoProtocolVersion.supported());
         ExtendedBlock lastBlock = addBlocks(fileName, clientName);
-        nameNodeProto.complete(fileName, clientName, lastBlock, INodeId.GRANDFATHER_INODE_ID);
+        clientProto.complete(fileName, clientName, lastBlock, INodeId.GRANDFATHER_INODE_ID);
       }
       // prepare block reports
       for(int idx=0; idx < nrDatanodes; idx++) {
@@ -1151,7 +1176,7 @@
     throws IOException {
       ExtendedBlock prevBlock = null;
       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
-        LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
+        LocatedBlock loc = clientProto.addBlock(fileName, clientName,
             prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null);
         prevBlock = loc.getBlock();
         for(DatanodeInfo dnInfo : loc.getLocations()) {
@@ -1162,8 +1187,8 @@
               ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
           StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
               datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
-          nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
-              .getBlock().getBlockPoolId(), report);
+          dataNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration,
+              bpid, report);
         }
       }
       return prevBlock;
@@ -1184,8 +1209,7 @@
       long start = Time.now();
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
-      nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
-          .getBlockPoolId(), report);
+      dataNodeProto.blockReport(dn.dnRegistration, bpid, report);
       long end = Time.now();
       return end-start;
     }
@@ -1315,7 +1339,7 @@
         LOG.info("Datanode " + dn + " is decommissioned.");
       }
       excludeFile.close();
-      nameNodeProto.refreshNodes();
+      clientProto.refreshNodes();
     }
 
     /**
@@ -1411,8 +1435,6 @@
 
     // Start the NameNode
     String[] argv = new String[] {};
-    nameNode = NameNode.createNameNode(argv, config);
-    nameNodeProto = nameNode.getRpcServer();
 
     List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
     OperationStatsBase opStat = null;
@@ -1453,6 +1475,29 @@
         opStat = new CleanAllStats(args);
         ops.add(opStat);
       }
+
+      if (namenodeUri == null) {
+        nameNode = NameNode.createNameNode(argv, config);
+        NamenodeProtocols nnProtos = nameNode.getRpcServer();
+        nameNodeProto = nnProtos;
+        clientProto = nnProtos;
+        dataNodeProto = nnProtos;
+        refreshUserMappingsProto = nnProtos;
+        bpid = nameNode.getNamesystem().getBlockPoolId();
+      } else {
+        FileSystem.setDefaultUri(getConf(), namenodeUri);
+        DistributedFileSystem dfs = (DistributedFileSystem)
+            FileSystem.get(getConf());
+        final URI nnUri = new URI(namenodeUri);
+        nameNodeProto = DFSTestUtil.getNamenodeProtocolProxy(config, nnUri,
+            UserGroupInformation.getCurrentUser());
+        clientProto = dfs.getClient().getNamenode();
+        dataNodeProto = new DatanodeProtocolClientSideTranslatorPB(
+            NameNode.getAddress(nnUri), config);
+        refreshUserMappingsProto =
+            DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnUri);
+        getBlockPoolId(dfs);
+      }
       if(ops.size() == 0)
         printUsage();
       // run each benchmark
@@ -1473,6 +1518,12 @@
     return 0;
   }
 
+  private void getBlockPoolId(DistributedFileSystem unused)
+    throws IOException {
+    final NamespaceInfo nsInfo = nameNodeProto.versionRequest();
+    bpid = nsInfo.getBlockPoolID();
+  }
+
   public static void main(String[] args) throws Exception {
     NNThroughputBenchmark bench = null;
     try {