YARN-5434. Add -client|server argument for graceful decommmission. Contributed by Robert Kanter.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index aa7fc30..4aa3a14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -98,11 +98,17 @@
               "Reload the queues' acls, states and scheduler specific " +
                   "properties. \n\t\tResourceManager will reload the " +
                   "mapred-queues configuration file."))
-          .put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]",
+          .put("-refreshNodes",
+              new UsageInfo("[-g [timeout in seconds] -client|server]",
               "Refresh the hosts information at the ResourceManager. Here "
-              + "[-g [timeout in seconds] is optional, if we specify the "
-              + "timeout then ResourceManager will wait for timeout before "
-              + "marking the NodeManager as decommissioned."))
+              + "[-g [timeout in seconds] -client|server] is optional, if we "
+              + "specify the timeout then ResourceManager will wait for "
+              + "timeout before marking the NodeManager as decommissioned."
+              + " The -client|server indicates if the timeout tracking should"
+              + " be handled by the client or the ResourceManager. The client"
+              + "-side tracking is blocking, while the server-side tracking"
+              + " is not. Omitting the timeout, or a timeout of -1, indicates"
+              + " an infinite timeout."))
           .put("-refreshNodesResources", new UsageInfo("",
               "Refresh resources of NodeManagers at the ResourceManager."))
           .put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
@@ -230,7 +236,7 @@
     summary.append("The full syntax is: \n\n" +
     "yarn rmadmin" +
       " [-refreshQueues]" +
-      " [-refreshNodes [-g [timeout in seconds]]]" +
+      " [-refreshNodes [-g [timeout in seconds] -client|server]]" +
       " [-refreshNodesResources]" +
       " [-refreshSuperUserGroupsConfiguration]" +
       " [-refreshUserToGroupsMappings]" +
@@ -312,7 +318,12 @@
     return 0;
   }
 
-  private int refreshNodes(long timeout) throws IOException, YarnException {
+  private int refreshNodes(long timeout, String trackingMode)
+      throws IOException, YarnException {
+    if (!"client".equals(trackingMode)) {
+      throw new UnsupportedOperationException(
+          "Only client tracking mode is currently supported.");
+    }
     // Graceful decommissioning with timeout
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshNodesRequest gracefulRequest = RefreshNodesRequest
@@ -721,11 +732,18 @@
       } else if ("-refreshNodes".equals(cmd)) {
         if (args.length == 1) {
           exitCode = refreshNodes();
-        } else if (args.length == 3) {
+        } else if (args.length == 3 || args.length == 4) {
           // if the graceful timeout specified
           if ("-g".equals(args[1])) {
-            long timeout = validateTimeout(args[2]);
-            exitCode = refreshNodes(timeout);
+            long timeout = -1;
+            String trackingMode;
+            if (args.length == 4) {
+              timeout = validateTimeout(args[2]);
+              trackingMode = validateTrackingMode(args[3]);
+            } else {
+              trackingMode = validateTrackingMode(args[2]);
+            }
+            exitCode = refreshNodes(timeout, trackingMode);
           } else {
             printUsage(cmd, isHAEnabled);
             return -1;
@@ -838,6 +856,16 @@
     return timeout;
   }
 
+  private String validateTrackingMode(String mode) {
+    if ("-client".equals(mode)) {
+      return "client";
+    }
+    if ("-server".equals(mode)) {
+      return "server";
+    }
+    throw new IllegalArgumentException("Invalid mode specified: " + mode);
+  }
+
   @Override
   public void setConf(Configuration conf) {
     if (conf != null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
index 1551333..d3161ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
@@ -26,6 +26,7 @@
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -255,9 +256,9 @@
   }
 
   @Test
-  public void testRefreshNodesWithGracefulTimeout() throws Exception {
+  public void testRefreshNodesGracefulBeforeTimeout() throws Exception {
     // graceful decommission before timeout
-    String[] args = { "-refreshNodes", "-g", "1" };
+    String[] args = {"-refreshNodes", "-g", "1", "-client"};
     CheckForDecommissioningNodesResponse response = Records
         .newRecord(CheckForDecommissioningNodesResponse.class);
     HashSet<NodeId> decomNodes = new HashSet<NodeId>();
@@ -267,30 +268,91 @@
     assertEquals(0, rmAdminCLI.run(args));
     verify(admin).refreshNodes(
         RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+    verify(admin, never()).refreshNodes(
+        RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+  }
 
+  @Test
+  public void testRefreshNodesGracefulHitTimeout() throws Exception {
     // Forceful decommission when timeout occurs
-    String[] focefulDecomArgs = { "-refreshNodes", "-g", "1" };
-    decomNodes = new HashSet<NodeId>();
+    String[] forcefulDecomArgs = {"-refreshNodes", "-g", "1", "-client"};
+    HashSet<NodeId> decomNodes = new HashSet<NodeId>();
+    CheckForDecommissioningNodesResponse response = Records
+        .newRecord(CheckForDecommissioningNodesResponse.class);
     response.setDecommissioningNodes(decomNodes);
     decomNodes.add(NodeId.newInstance("node1", 100));
     response.setDecommissioningNodes(decomNodes);
     when(admin.checkForDecommissioningNodes(any(
         CheckForDecommissioningNodesRequest.class))).thenReturn(response);
-    assertEquals(0, rmAdminCLI.run(focefulDecomArgs));
+    assertEquals(0, rmAdminCLI.run(forcefulDecomArgs));
     verify(admin).refreshNodes(
         RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+  }
 
+  @Test
+  public void testRefreshNodesGracefulInfiniteTimeout() throws Exception {
+    String[] infiniteTimeoutArgs = {"-refreshNodes", "-g", "-1", "-client"};
+    testRefreshNodesGracefulInfiniteTimeout(infiniteTimeoutArgs);
+  }
+
+  @Test
+  public void testRefreshNodesGracefulNoTimeout() throws Exception {
+    // no timeout (infinite timeout)
+    String[] noTimeoutArgs = {"-refreshNodes", "-g", "-client"};
+    testRefreshNodesGracefulInfiniteTimeout(noTimeoutArgs);
+  }
+
+  private void testRefreshNodesGracefulInfiniteTimeout(String[] args)
+      throws Exception {
+    when(admin.checkForDecommissioningNodes(any(
+        CheckForDecommissioningNodesRequest.class))).thenAnswer(
+        new Answer<CheckForDecommissioningNodesResponse>() {
+            private int count = 5;
+            @Override
+            public CheckForDecommissioningNodesResponse answer(
+                InvocationOnMock invocationOnMock) throws Throwable {
+              CheckForDecommissioningNodesResponse response = Records
+                  .newRecord(CheckForDecommissioningNodesResponse.class);
+              HashSet<NodeId> decomNodes = new HashSet<NodeId>();
+              count--;
+              if (count <= 0) {
+                response.setDecommissioningNodes(decomNodes);
+                return response;
+              } else {
+                decomNodes.add(NodeId.newInstance("node1", 100));
+                response.setDecommissioningNodes(decomNodes);
+                return response;
+              }
+            }
+          });
+    assertEquals(0, rmAdminCLI.run(args));
+    verify(admin, atLeastOnce()).refreshNodes(
+        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+    verify(admin, never()).refreshNodes(
+        RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+  }
+
+  @Test
+  public void testRefreshNodesGracefulInvalidArgs() throws Exception {
     // invalid graceful timeout parameter
-    String[] invalidArgs = { "-refreshNodes", "-ginvalid", "invalid" };
+    String[] invalidArgs = {"-refreshNodes", "-ginvalid", "invalid", "-client"};
     assertEquals(-1, rmAdminCLI.run(invalidArgs));
 
     // invalid timeout
-    String[] invalidTimeoutArgs = { "-refreshNodes", "-g", "invalid" };
+    String[] invalidTimeoutArgs = {"-refreshNodes", "-g", "invalid", "-client"};
     assertEquals(-1, rmAdminCLI.run(invalidTimeoutArgs));
 
     // negative timeout
-    String[] negativeTimeoutArgs = { "-refreshNodes", "-g", "-1000" };
+    String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
     assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
+
+    // server tracking mode
+    String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
+    assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
+
+    // invalid tracking mode
+    String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
+    assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
   }
 
   @Test(timeout=500)
@@ -404,8 +466,8 @@
           .toString()
           .contains(
               "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
-              "seconds]]] [-refreshNodesResources] [-refreshSuperUserGroups" +
-              "Configuration] [-refreshUserToGroupsMappings] " +
+              "seconds] -client|server]] [-refreshNodesResources] [-refresh" +
+              "SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
               "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
               "[username]] [-addToClusterNodeLabels " +
               "<\"label1(exclusive=true),label2(exclusive=false),label3\">] " +
@@ -423,8 +485,8 @@
       assertTrue(dataOut
           .toString()
           .contains(
-              "-refreshNodes [-g [timeout in seconds]]: Refresh the hosts information at the " +
-              "ResourceManager."));
+              "-refreshNodes [-g [timeout in seconds] -client|server]: " +
+              "Refresh the hosts information at the ResourceManager."));
       assertTrue(dataOut
           .toString()
           .contains(
@@ -456,7 +518,8 @@
       testError(new String[] { "-help", "-refreshQueues" },
           "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
       testError(new String[] { "-help", "-refreshNodes" },
-          "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds]]]", dataErr, 0);
+          "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
+          "-client|server]]", dataErr, 0);
       testError(new String[] { "-help", "-refreshNodesResources" },
           "Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
       testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@@ -495,7 +558,8 @@
       assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
       oldOutPrintStream.println(dataOut);
       String expectedHelpMsg = 
-          "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] "
+          "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
+              + "seconds] -client|server]] "
               + "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] "
               + "[-refreshUserToGroupsMappings] "
               + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"