YARN-5434. Add -client|server argument for graceful decommmission. Contributed by Robert Kanter.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index aa7fc30..4aa3a14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -98,11 +98,17 @@
"Reload the queues' acls, states and scheduler specific " +
"properties. \n\t\tResourceManager will reload the " +
"mapred-queues configuration file."))
- .put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]",
+ .put("-refreshNodes",
+ new UsageInfo("[-g [timeout in seconds] -client|server]",
"Refresh the hosts information at the ResourceManager. Here "
- + "[-g [timeout in seconds] is optional, if we specify the "
- + "timeout then ResourceManager will wait for timeout before "
- + "marking the NodeManager as decommissioned."))
+ + "[-g [timeout in seconds] -client|server] is optional, if we "
+ + "specify the timeout then ResourceManager will wait for "
+ + "timeout before marking the NodeManager as decommissioned."
+ + " The -client|server indicates if the timeout tracking should"
+ + " be handled by the client or the ResourceManager. The client"
+ + "-side tracking is blocking, while the server-side tracking"
+ + " is not. Omitting the timeout, or a timeout of -1, indicates"
+ + " an infinite timeout."))
.put("-refreshNodesResources", new UsageInfo("",
"Refresh resources of NodeManagers at the ResourceManager."))
.put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
@@ -230,7 +236,7 @@
summary.append("The full syntax is: \n\n" +
"yarn rmadmin" +
" [-refreshQueues]" +
- " [-refreshNodes [-g [timeout in seconds]]]" +
+ " [-refreshNodes [-g [timeout in seconds] -client|server]]" +
" [-refreshNodesResources]" +
" [-refreshSuperUserGroupsConfiguration]" +
" [-refreshUserToGroupsMappings]" +
@@ -312,7 +318,12 @@
return 0;
}
- private int refreshNodes(long timeout) throws IOException, YarnException {
+ private int refreshNodes(long timeout, String trackingMode)
+ throws IOException, YarnException {
+ if (!"client".equals(trackingMode)) {
+ throw new UnsupportedOperationException(
+ "Only client tracking mode is currently supported.");
+ }
// Graceful decommissioning with timeout
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshNodesRequest gracefulRequest = RefreshNodesRequest
@@ -721,11 +732,18 @@
} else if ("-refreshNodes".equals(cmd)) {
if (args.length == 1) {
exitCode = refreshNodes();
- } else if (args.length == 3) {
+ } else if (args.length == 3 || args.length == 4) {
// if the graceful timeout specified
if ("-g".equals(args[1])) {
- long timeout = validateTimeout(args[2]);
- exitCode = refreshNodes(timeout);
+ long timeout = -1;
+ String trackingMode;
+ if (args.length == 4) {
+ timeout = validateTimeout(args[2]);
+ trackingMode = validateTrackingMode(args[3]);
+ } else {
+ trackingMode = validateTrackingMode(args[2]);
+ }
+ exitCode = refreshNodes(timeout, trackingMode);
} else {
printUsage(cmd, isHAEnabled);
return -1;
@@ -838,6 +856,16 @@
return timeout;
}
+ private String validateTrackingMode(String mode) {
+ if ("-client".equals(mode)) {
+ return "client";
+ }
+ if ("-server".equals(mode)) {
+ return "server";
+ }
+ throw new IllegalArgumentException("Invalid mode specified: " + mode);
+ }
+
@Override
public void setConf(Configuration conf) {
if (conf != null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
index 1551333..d3161ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
@@ -26,6 +26,7 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -255,9 +256,9 @@
}
@Test
- public void testRefreshNodesWithGracefulTimeout() throws Exception {
+ public void testRefreshNodesGracefulBeforeTimeout() throws Exception {
// graceful decommission before timeout
- String[] args = { "-refreshNodes", "-g", "1" };
+ String[] args = {"-refreshNodes", "-g", "1", "-client"};
CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class);
HashSet<NodeId> decomNodes = new HashSet<NodeId>();
@@ -267,30 +268,91 @@
assertEquals(0, rmAdminCLI.run(args));
verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+ verify(admin, never()).refreshNodes(
+ RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+ }
+ @Test
+ public void testRefreshNodesGracefulHitTimeout() throws Exception {
// Forceful decommission when timeout occurs
- String[] focefulDecomArgs = { "-refreshNodes", "-g", "1" };
- decomNodes = new HashSet<NodeId>();
+ String[] forcefulDecomArgs = {"-refreshNodes", "-g", "1", "-client"};
+ HashSet<NodeId> decomNodes = new HashSet<NodeId>();
+ CheckForDecommissioningNodesResponse response = Records
+ .newRecord(CheckForDecommissioningNodesResponse.class);
response.setDecommissioningNodes(decomNodes);
decomNodes.add(NodeId.newInstance("node1", 100));
response.setDecommissioningNodes(decomNodes);
when(admin.checkForDecommissioningNodes(any(
CheckForDecommissioningNodesRequest.class))).thenReturn(response);
- assertEquals(0, rmAdminCLI.run(focefulDecomArgs));
+ assertEquals(0, rmAdminCLI.run(forcefulDecomArgs));
verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+ }
+ @Test
+ public void testRefreshNodesGracefulInfiniteTimeout() throws Exception {
+ String[] infiniteTimeoutArgs = {"-refreshNodes", "-g", "-1", "-client"};
+ testRefreshNodesGracefulInfiniteTimeout(infiniteTimeoutArgs);
+ }
+
+ @Test
+ public void testRefreshNodesGracefulNoTimeout() throws Exception {
+ // no timeout (infinite timeout)
+ String[] noTimeoutArgs = {"-refreshNodes", "-g", "-client"};
+ testRefreshNodesGracefulInfiniteTimeout(noTimeoutArgs);
+ }
+
+ private void testRefreshNodesGracefulInfiniteTimeout(String[] args)
+ throws Exception {
+ when(admin.checkForDecommissioningNodes(any(
+ CheckForDecommissioningNodesRequest.class))).thenAnswer(
+ new Answer<CheckForDecommissioningNodesResponse>() {
+ private int count = 5;
+ @Override
+ public CheckForDecommissioningNodesResponse answer(
+ InvocationOnMock invocationOnMock) throws Throwable {
+ CheckForDecommissioningNodesResponse response = Records
+ .newRecord(CheckForDecommissioningNodesResponse.class);
+ HashSet<NodeId> decomNodes = new HashSet<NodeId>();
+ count--;
+ if (count <= 0) {
+ response.setDecommissioningNodes(decomNodes);
+ return response;
+ } else {
+ decomNodes.add(NodeId.newInstance("node1", 100));
+ response.setDecommissioningNodes(decomNodes);
+ return response;
+ }
+ }
+ });
+ assertEquals(0, rmAdminCLI.run(args));
+ verify(admin, atLeastOnce()).refreshNodes(
+ RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+ verify(admin, never()).refreshNodes(
+ RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+ }
+
+ @Test
+ public void testRefreshNodesGracefulInvalidArgs() throws Exception {
// invalid graceful timeout parameter
- String[] invalidArgs = { "-refreshNodes", "-ginvalid", "invalid" };
+ String[] invalidArgs = {"-refreshNodes", "-ginvalid", "invalid", "-client"};
assertEquals(-1, rmAdminCLI.run(invalidArgs));
// invalid timeout
- String[] invalidTimeoutArgs = { "-refreshNodes", "-g", "invalid" };
+ String[] invalidTimeoutArgs = {"-refreshNodes", "-g", "invalid", "-client"};
assertEquals(-1, rmAdminCLI.run(invalidTimeoutArgs));
// negative timeout
- String[] negativeTimeoutArgs = { "-refreshNodes", "-g", "-1000" };
+ String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
+
+ // server tracking mode
+ String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
+ assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
+
+ // invalid tracking mode
+ String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
+ assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
}
@Test(timeout=500)
@@ -404,8 +466,8 @@
.toString()
.contains(
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
- "seconds]]] [-refreshNodesResources] [-refreshSuperUserGroups" +
- "Configuration] [-refreshUserToGroupsMappings] " +
+ "seconds] -client|server]] [-refreshNodesResources] [-refresh" +
+ "SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
"[username]] [-addToClusterNodeLabels " +
"<\"label1(exclusive=true),label2(exclusive=false),label3\">] " +
@@ -423,8 +485,8 @@
assertTrue(dataOut
.toString()
.contains(
- "-refreshNodes [-g [timeout in seconds]]: Refresh the hosts information at the " +
- "ResourceManager."));
+ "-refreshNodes [-g [timeout in seconds] -client|server]: " +
+ "Refresh the hosts information at the ResourceManager."));
assertTrue(dataOut
.toString()
.contains(
@@ -456,7 +518,8 @@
testError(new String[] { "-help", "-refreshQueues" },
"Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodes" },
- "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds]]]", dataErr, 0);
+ "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
+ "-client|server]]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodesResources" },
"Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@@ -495,7 +558,8 @@
assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
oldOutPrintStream.println(dataOut);
String expectedHelpMsg =
- "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] "
+ "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
+ + "seconds] -client|server]] "
+ "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] "
+ "[-refreshUserToGroupsMappings] "
+ "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"