YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation (#4610)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 42a2260..d6ce729 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -91,6 +91,12 @@
private MutableGaugeInt numGetResourceProfilesFailedRetrieved;
@Metric("# of getResourceProfile failed to be retrieved")
private MutableGaugeInt numGetResourceProfileFailedRetrieved;
+ @Metric("# of getAttributesToNodes failed to be retrieved")
+ private MutableGaugeInt numGetAttributesToNodesFailedRetrieved;
+ @Metric("# of getClusterNodeAttributes failed to be retrieved")
+ private MutableGaugeInt numGetClusterNodeAttributesFailedRetrieved;
+ @Metric("# of getNodesToAttributes failed to be retrieved")
+ private MutableGaugeInt numGetNodesToAttributesFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@@ -101,14 +107,11 @@
private MutableRate totalSucceededAppsCreated;
@Metric("Total number of successful Retrieved app reports and latency(ms)")
private MutableRate totalSucceededAppsRetrieved;
- @Metric("Total number of successful Retrieved multiple apps reports and "
- + "latency(ms)")
+ @Metric("Total number of successful Retrieved multiple apps reports and latency(ms)")
private MutableRate totalSucceededMultipleAppsRetrieved;
- @Metric("Total number of successful Retrieved " +
- "appAttempt reports and latency(ms)")
+ @Metric("Total number of successful Retrieved appAttempt reports and latency(ms)")
private MutableRate totalSucceededAppAttemptsRetrieved;
- @Metric("Total number of successful Retrieved getClusterMetrics and "
- + "latency(ms)")
+ @Metric("Total number of successful Retrieved getClusterMetrics and latency(ms)")
private MutableRate totalSucceededGetClusterMetricsRetrieved;
@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
private MutableRate totalSucceededGetClusterNodesRetrieved;
@@ -144,9 +147,14 @@
private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;
@Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)")
private MutableRate totalSucceededGetResourceProfilesRetrieved;
-
@Metric("Total number of successful Retrieved getResourceProfile and latency(ms)")
private MutableRate totalSucceededGetResourceProfileRetrieved;
+ @Metric("Total number of successful Retrieved getAttributesToNodes and latency(ms)")
+ private MutableRate totalSucceededGetAttributesToNodesRetrieved;
+ @Metric("Total number of successful Retrieved getClusterNodeAttributes and latency(ms)")
+ private MutableRate totalSucceededGetClusterNodeAttributesRetrieved;
+ @Metric("Total number of successful Retrieved getNodesToAttributes and latency(ms)")
+ private MutableRate totalSucceededGetNodesToAttributesRetrieved;
/**
* Provide quantile counters for all latencies.
@@ -176,6 +184,10 @@
private MutableQuantiles moveApplicationAcrossQueuesLatency;
private MutableQuantiles getResourceProfilesLatency;
private MutableQuantiles getResourceProfileLatency;
+ private MutableQuantiles getAttributesToNodesLatency;
+ private MutableQuantiles getClusterNodeAttributesLatency;
+
+ private MutableQuantiles getNodesToAttributesLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@@ -274,6 +286,18 @@
getResourceProfileLatency =
registry.newQuantiles("getResourceProfileLatency",
"latency of get resource profile timeouts", "ops", "latency", 10);
+
+ getAttributesToNodesLatency =
+ registry.newQuantiles("getAttributesToNodesLatency",
+ "latency of get attributes to nodes timeouts", "ops", "latency", 10);
+
+ getClusterNodeAttributesLatency =
+ registry.newQuantiles("getClusterNodeAttributesLatency",
+ "latency of get cluster node attributes timeouts", "ops", "latency", 10);
+
+ getNodesToAttributesLatency =
+ registry.newQuantiles("getNodesToAttributesLatency",
+ "latency of get nodes to attributes timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@@ -421,6 +445,21 @@
}
@VisibleForTesting
+ public long getNumSucceededGetAttributesToNodesRetrieved() {
+ return totalSucceededGetAttributesToNodesRetrieved.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ public long getNumSucceededGetClusterNodeAttributesRetrieved() {
+ return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ public long getNumSucceededGetNodesToAttributesRetrieved() {
+ return totalSucceededGetNodesToAttributesRetrieved.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
}
@@ -546,6 +585,21 @@
}
@VisibleForTesting
+ public double getLatencySucceededGetAttributesToNodesRetrieved() {
+ return totalSucceededGetAttributesToNodesRetrieved.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ public double getLatencySucceededGetClusterNodeAttributesRetrieved() {
+ return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ public double getLatencySucceededGetNodesToAttributesRetrieved() {
+ return totalSucceededGetNodesToAttributesRetrieved.lastStat().mean();
+ }
+
+ @VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
}
@@ -666,6 +720,18 @@
return numGetResourceProfileFailedRetrieved.value();
}
+ public int getAttributesToNodesFailedRetrieved() {
+ return numGetAttributesToNodesFailedRetrieved.value();
+ }
+
+ public int getClusterNodeAttributesFailedRetrieved() {
+ return numGetClusterNodeAttributesFailedRetrieved.value();
+ }
+
+ public int getNodesToAttributesFailedRetrieved() {
+ return numGetNodesToAttributesFailedRetrieved.value();
+ }
+
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@@ -791,6 +857,21 @@
getResourceProfileLatency.add(duration);
}
+ public void succeededGetAttributesToNodesRetrieved(long duration) {
+ totalSucceededGetAttributesToNodesRetrieved.add(duration);
+ getAttributesToNodesLatency.add(duration);
+ }
+
+ public void succeededGetClusterNodeAttributesRetrieved(long duration) {
+ totalSucceededGetClusterNodeAttributesRetrieved.add(duration);
+ getClusterNodeAttributesLatency.add(duration);
+ }
+
+ public void succeededGetNodesToAttributesRetrieved(long duration) {
+ totalSucceededGetNodesToAttributesRetrieved.add(duration);
+ getNodesToAttributesLatency.add(duration);
+ }
+
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@@ -890,4 +971,16 @@
public void incrGetResourceProfileFailedRetrieved() {
numGetResourceProfileFailedRetrieved.incr();
}
+
+ public void incrGetAttributesToNodesFailedRetrieved() {
+ numGetAttributesToNodesFailedRetrieved.incr();
+ }
+
+ public void incrGetClusterNodeAttributesFailedRetrieved() {
+ numGetClusterNodeAttributesFailedRetrieved.incr();
+ }
+
+ public void incrGetNodesToAttributesFailedRetrieved() {
+ numGetNodesToAttributesFailedRetrieved.incr();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 45cec64..7fd1003 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -175,7 +175,6 @@
federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random(System.currentTimeMillis());
-
int numThreads = getConf().getInt(
YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
@@ -195,12 +194,11 @@
LOG.error(e.getMessage());
}
- numSubmitRetries =
- conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
- YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
+ numSubmitRetries = conf.getInt(
+ YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
- clientRMProxies =
- new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
+ clientRMProxies = new ConcurrentHashMap<>();
routerMetrics = RouterMetrics.getMetrics();
returnPartialReport = conf.getBoolean(
@@ -227,19 +225,17 @@
ApplicationClientProtocol clientRMProxy = null;
try {
boolean serviceAuthEnabled = getConf().getBoolean(
- CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
UserGroupInformation realUser = user;
if (serviceAuthEnabled) {
- realUser = UserGroupInformation.createProxyUser(
- user.getShortUserName(), UserGroupInformation.getLoginUser());
+ realUser = UserGroupInformation.createProxyUser(user.getShortUserName(),
+ UserGroupInformation.getLoginUser());
}
clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
ApplicationClientProtocol.class, subClusterId, realUser);
} catch (Exception e) {
RouterServerUtil.logAndThrowException(
- "Unable to create the interface to reach the SubCluster "
- + subClusterId,
- e);
+ "Unable to create the interface to reach the SubCluster " + subClusterId, e);
}
clientRMProxies.put(subClusterId, clientRMProxy);
@@ -287,8 +283,7 @@
for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
- LOG.debug(
- "getNewApplication try #{} on SubCluster {}", i, subClusterId);
+ LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
GetNewApplicationResponse response = null;
@@ -410,7 +405,7 @@
ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId();
- List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
+ List<SubClusterId> blacklist = new ArrayList<>();
for (int i = 0; i < numSubmitRetries; ++i) {
@@ -561,8 +556,8 @@
}
if (response == null) {
- LOG.error("No response when attempting to kill the application "
- + applicationId + " to SubCluster " + subClusterId.getId());
+ LOG.error("No response when attempting to kill the application {} to SubCluster {}.",
+ applicationId, subClusterId.getId());
}
long stopTime = clock.getTime();
@@ -1015,7 +1010,7 @@
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes",
- new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
+ new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
Collection<GetLabelsToNodesResponse> labelNodes;
try {
labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
@@ -1040,7 +1035,7 @@
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels",
- new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
+ new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
Collection<GetClusterNodeLabelsResponse> nodeLabels;
try {
nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
@@ -1528,20 +1523,75 @@
@Override
public GetAttributesToNodesResponse getAttributesToNodes(
GetAttributesToNodesRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ if (request == null || request.getNodeAttributes() == null) {
+ routerMetrics.incrGetAttributesToNodesFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Missing getAttributesToNodes request " +
+ "or nodeAttributes.", null);
+ }
+ long startTime = clock.getTime();
+ ClientMethod remoteMethod = new ClientMethod("getAttributesToNodes",
+ new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request});
+ Collection<GetAttributesToNodesResponse> attributesToNodesResponses = null;
+ try {
+ attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
+ GetAttributesToNodesResponse.class);
+ } catch (Exception ex) {
+ routerMetrics.incrGetAttributesToNodesFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.",
+ ex);
+ }
+ long stopTime = clock.getTime();
+ routerMetrics.succeededGetAttributesToNodesRetrieved(stopTime - startTime);
+ return RouterYarnClientUtils.mergeAttributesToNodesResponse(attributesToNodesResponses);
}
@Override
public GetClusterNodeAttributesResponse getClusterNodeAttributes(
- GetClusterNodeAttributesRequest request)
- throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ GetClusterNodeAttributesRequest request) throws YarnException, IOException {
+ if (request == null) {
+ routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Missing getClusterNodeAttributes request.", null);
+ }
+ long startTime = clock.getTime();
+ ClientMethod remoteMethod = new ClientMethod("getClusterNodeAttributes",
+ new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request});
+ Collection<GetClusterNodeAttributesResponse> clusterNodeAttributesResponses = null;
+ try {
+ clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
+ GetClusterNodeAttributesResponse.class);
+ } catch (Exception ex) {
+ routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Unable to get cluster node attributes due " +
+ " to exception.", ex);
+ }
+ long stopTime = clock.getTime();
+ routerMetrics.succeededGetClusterNodeAttributesRetrieved(stopTime - startTime);
+ return RouterYarnClientUtils.mergeClusterNodeAttributesResponse(clusterNodeAttributesResponses);
}
@Override
public GetNodesToAttributesResponse getNodesToAttributes(
GetNodesToAttributesRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ if (request == null || request.getHostNames() == null) {
+ routerMetrics.incrGetNodesToAttributesFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Missing getNodesToAttributes request or " +
+ "hostNames.", null);
+ }
+ long startTime = clock.getTime();
+ ClientMethod remoteMethod = new ClientMethod("getNodesToAttributes",
+ new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request});
+ Collection<GetNodesToAttributesResponse> nodesToAttributesResponses = null;
+ try {
+ nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
+ GetNodesToAttributesResponse.class);
+ } catch (Exception ex) {
+ routerMetrics.incrGetNodesToAttributesFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Unable to get nodes to attributes due " +
+ " to exception.", ex);
+ }
+ long stopTime = clock.getTime();
+ routerMetrics.succeededGetNodesToAttributesRetrieved(stopTime - startTime);
+ return RouterYarnClientUtils.mergeNodesToAttributesResponse(nodesToAttributesResponses);
}
protected SubClusterId getApplicationHomeSubCluster(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
index d72e72a..e70d552 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
@@ -37,6 +37,9 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -49,9 +52,12 @@
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
+import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -468,5 +474,56 @@
profileResponse.setResource(resource);
return profileResponse;
}
+
+ /**
+ * Merges a list of GetAttributesToNodesResponse.
+ *
+ * @param responses a list of GetAttributesToNodesResponse to merge.
+ * @return the merged GetAttributesToNodesResponse.
+ */
+ public static GetAttributesToNodesResponse mergeAttributesToNodesResponse(
+ Collection<GetAttributesToNodesResponse> responses) {
+ Map<NodeAttributeKey, List<NodeToAttributeValue>> nodeAttributeMap = new HashMap<>();
+ for (GetAttributesToNodesResponse response : responses) {
+ if (response != null && response.getAttributesToNodes() != null) {
+ nodeAttributeMap.putAll(response.getAttributesToNodes());
+ }
+ }
+ return GetAttributesToNodesResponse.newInstance(nodeAttributeMap);
+ }
+
+ /**
+ * Merges a list of GetClusterNodeAttributesResponse.
+ *
+ * @param responses a list of GetClusterNodeAttributesResponse to merge.
+ * @return the merged GetClusterNodeAttributesResponse.
+ */
+ public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesResponse(
+ Collection<GetClusterNodeAttributesResponse> responses) {
+ Set<NodeAttributeInfo> nodeAttributeInfo = new HashSet<>();
+ for (GetClusterNodeAttributesResponse response : responses) {
+ if (response != null && response.getNodeAttributes() != null) {
+ nodeAttributeInfo.addAll(response.getNodeAttributes());
+ }
+ }
+ return GetClusterNodeAttributesResponse.newInstance(nodeAttributeInfo);
+ }
+
+ /**
+ * Merges a list of GetNodesToAttributesResponse.
+ *
+ * @param responses a list of GetNodesToAttributesResponse to merge.
+ * @return the merged GetNodesToAttributesResponse.
+ */
+ public static GetNodesToAttributesResponse mergeNodesToAttributesResponse(
+ Collection<GetNodesToAttributesResponse> responses) {
+ Map<String, Set<NodeAttribute>> attributesMap = new HashMap<>();
+ for (GetNodesToAttributesResponse response : responses) {
+ if (response != null && response.getNodeToAttributes() != null) {
+ attributesMap.putAll(response.getNodeToAttributes());
+ }
+ }
+ return GetNodesToAttributesResponse.newInstance(attributesMap);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 61fcd53..455cb22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -438,6 +438,21 @@
LOG.info("Mocked: failed getResourceProfileFailed call");
metrics.incrGetResourceProfileFailedRetrieved();
}
+
+ public void getAttributesToNodesFailed() {
+ LOG.info("Mocked: failed getAttributesToNodesFailed call");
+ metrics.incrGetAttributesToNodesFailedRetrieved();
+ }
+
+ public void getClusterNodeAttributesFailed() {
+ LOG.info("Mocked: failed getClusterNodeAttributesFailed call");
+ metrics.incrGetClusterNodeAttributesFailedRetrieved();
+ }
+
+ public void getNodesToAttributesFailed() {
+ LOG.info("Mocked: failed getNodesToAttributesFailed call");
+ metrics.incrGetNodesToAttributesFailedRetrieved();
+ }
}
// Records successes for all calls
@@ -573,6 +588,21 @@
LOG.info("Mocked: successful getResourceProfile call with duration {}", duration);
metrics.succeededGetResourceProfileRetrieved(duration);
}
+
+ public void getAttributesToNodesRetrieved(long duration) {
+ LOG.info("Mocked: successful getAttributesToNodes call with duration {}", duration);
+ metrics.succeededGetAttributesToNodesRetrieved(duration);
+ }
+
+ public void getClusterNodeAttributesRetrieved(long duration) {
+ LOG.info("Mocked: successful getClusterNodeAttributes call with duration {}", duration);
+ metrics.succeededGetClusterNodeAttributesRetrieved(duration);
+ }
+
+ public void getNodesToAttributesRetrieved(long duration) {
+ LOG.info("Mocked: successful getNodesToAttributes call with duration {}", duration);
+ metrics.succeededGetNodesToAttributesRetrieved(duration);
+ }
}
@Test
@@ -970,4 +1000,73 @@
Assert.assertEquals(totalBadBefore + 1,
metrics.getResourceProfileFailedRetrieved());
}
+
+ @Test
+ public void testSucceededGetAttributesToNodesRetrieved() {
+ long totalGoodBefore = metrics.getNumSucceededGetAttributesToNodesRetrieved();
+ goodSubCluster.getAttributesToNodesRetrieved(150);
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededGetAttributesToNodesRetrieved());
+ Assert.assertEquals(150,
+ metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA);
+ goodSubCluster.getAttributesToNodesRetrieved(300);
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededGetAttributesToNodesRetrieved());
+ Assert.assertEquals(225,
+ metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA);
+ }
+
+ @Test
+ public void testGetAttributesToNodesRetrievedFailed() {
+ long totalBadBefore = metrics.getAttributesToNodesFailedRetrieved();
+ badSubCluster.getAttributesToNodesFailed();
+ Assert.assertEquals(totalBadBefore + 1,
+ metrics.getAttributesToNodesFailedRetrieved());
+ }
+
+ @Test
+ public void testGetClusterNodeAttributesRetrieved() {
+ long totalGoodBefore = metrics.getNumSucceededGetClusterNodeAttributesRetrieved();
+ goodSubCluster.getClusterNodeAttributesRetrieved(150);
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededGetClusterNodeAttributesRetrieved());
+ Assert.assertEquals(150,
+ metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
+ goodSubCluster.getClusterNodeAttributesRetrieved(300);
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededGetClusterNodeAttributesRetrieved());
+ Assert.assertEquals(225,
+ metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
+ }
+
+ @Test
+ public void testGetClusterNodeAttributesRetrievedFailed() {
+ long totalBadBefore = metrics.getClusterNodeAttributesFailedRetrieved();
+ badSubCluster.getClusterNodeAttributesFailed();
+ Assert.assertEquals(totalBadBefore + 1,
+ metrics.getClusterNodeAttributesFailedRetrieved());
+ }
+
+ @Test
+ public void testGetNodesToAttributesRetrieved() {
+ long totalGoodBefore = metrics.getNumSucceededGetNodesToAttributesRetrieved();
+ goodSubCluster.getNodesToAttributesRetrieved(150);
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededGetNodesToAttributesRetrieved());
+ Assert.assertEquals(150,
+ metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
+ goodSubCluster.getNodesToAttributesRetrieved(300);
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededGetNodesToAttributesRetrieved());
+ Assert.assertEquals(225,
+ metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
+ }
+
+ @Test
+ public void testGetNodesToAttributesRetrievedFailed() {
+ long totalBadBefore = metrics.getNodesToAttributesFailedRetrieved();
+ badSubCluster.getNodesToAttributesFailed();
+ Assert.assertEquals(totalBadBefore + 1,
+ metrics.getNodesToAttributesFailedRetrieved());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 49872e5..f0aa480 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -82,6 +82,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -96,6 +102,11 @@
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
+import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -1233,4 +1244,81 @@
Assert.assertEquals(4096, response3.getResource().getMemorySize());
Assert.assertEquals(4, response3.getResource().getVirtualCores());
}
+
+ @Test
+ public void testGetAttributesToNodes() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Get AttributesToNodes request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class, "Missing getAttributesToNodes request " +
+ "or nodeAttributes.", () -> interceptor.getAttributesToNodes(null));
+
+ // normal request
+ GetAttributesToNodesResponse response =
+ interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance());
+
+ Assert.assertNotNull(response);
+ Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes();
+ Assert.assertNotNull(attrs);
+ Assert.assertEquals(4, attrs.size());
+
+ NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+ NodeAttributeType.STRING, "nvidia");
+ NodeToAttributeValue attributeValue1 =
+ NodeToAttributeValue.newInstance("0-host1", gpu.getAttributeValue());
+ NodeAttributeKey gpuKey = gpu.getAttributeKey();
+ Assert.assertTrue(attrs.get(gpuKey).contains(attributeValue1));
+ }
+
+ @Test
+ public void testClusterNodeAttributes() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Get ClusterNodeAttributes request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeAttributes request.",
+ () -> interceptor.getClusterNodeAttributes(null));
+
+ // normal request
+ GetClusterNodeAttributesResponse response =
+ interceptor.getClusterNodeAttributes(GetClusterNodeAttributesRequest.newInstance());
+
+ Assert.assertNotNull(response);
+ Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes();
+ Assert.assertNotNull(nodeAttributeInfos);
+ Assert.assertEquals(4, nodeAttributeInfos.size());
+
+ NodeAttributeInfo nodeAttributeInfo1 =
+ NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
+ NodeAttributeType.STRING);
+ Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1));
+
+ NodeAttributeInfo nodeAttributeInfo2 =
+ NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("OS"),
+ NodeAttributeType.STRING);
+ Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2));
+ }
+
+ @Test
+ public void testNodesToAttributes() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Get NodesToAttributes request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getNodesToAttributes request or hostNames.",
+ () -> interceptor.getNodesToAttributes(null));
+
+ // normal request
+ Set<String> hostNames = Collections.singleton("0-host1");
+ GetNodesToAttributesResponse response =
+ interceptor.getNodesToAttributes(GetNodesToAttributesRequest.newInstance(hostNames));
+ Assert.assertNotNull(response);
+
+ Map<String, Set<NodeAttribute>> nodeAttributeMap = response.getNodeToAttributes();
+ Assert.assertNotNull(nodeAttributeMap);
+ Assert.assertEquals(1, nodeAttributeMap.size());
+
+ NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+ NodeAttributeType.STRING, "nvida");
+ Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java
index d586c48..33cae61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java
@@ -37,6 +37,9 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -53,6 +56,11 @@
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
+import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
+import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.junit.Assert;
@@ -610,4 +618,147 @@
Assert.assertEquals(3, resource.getVirtualCores());
Assert.assertEquals(3072, resource.getMemorySize());
}
+
+ @Test
+ public void testMergeAttributesToNodesResponse() {
+ // normal response1
+ NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+ NodeAttributeType.STRING, "nvidia");
+ Map<NodeAttributeKey, List<NodeToAttributeValue>> map1 = new HashMap<>();
+ List<NodeToAttributeValue> lists1 = new ArrayList<>();
+ NodeToAttributeValue attributeValue1 =
+ NodeToAttributeValue.newInstance("node1", gpu.getAttributeValue());
+ lists1.add(attributeValue1);
+ map1.put(gpu.getAttributeKey(), lists1);
+ GetAttributesToNodesResponse response1 = GetAttributesToNodesResponse.newInstance(map1);
+
+ // normal response2
+ NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
+ NodeAttributeType.STRING, "docker0");
+ Map<NodeAttributeKey, List<NodeToAttributeValue>> map2 = new HashMap<>();
+ List<NodeToAttributeValue> lists2 = new ArrayList<>();
+ NodeToAttributeValue attributeValue2 =
+ NodeToAttributeValue.newInstance("node2", docker.getAttributeValue());
+ lists2.add(attributeValue2);
+ map2.put(docker.getAttributeKey(), lists2);
+ GetAttributesToNodesResponse response2 = GetAttributesToNodesResponse.newInstance(map2);
+
+ // empty response3
+ GetAttributesToNodesResponse response3 =
+ GetAttributesToNodesResponse.newInstance(new HashMap<>());
+
+ // null response4
+ GetAttributesToNodesResponse response4 = null;
+
+ List<GetAttributesToNodesResponse> responses = new ArrayList<>();
+ responses.add(response1);
+ responses.add(response2);
+ responses.add(response3);
+ responses.add(response4);
+
+ GetAttributesToNodesResponse response =
+ RouterYarnClientUtils.mergeAttributesToNodesResponse(responses);
+
+ Assert.assertNotNull(response);
+ Assert.assertEquals(2, response.getAttributesToNodes().size());
+
+ Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes();
+
+ NodeAttributeKey gpuKey = gpu.getAttributeKey();
+ Assert.assertEquals(attributeValue1.toString(), attrs.get(gpuKey).get(0).toString());
+
+ NodeAttributeKey dockerKey = docker.getAttributeKey();
+ Assert.assertEquals(attributeValue2.toString(), attrs.get(dockerKey).get(0).toString());
+ }
+
+ @Test
+ public void testMergeClusterNodeAttributesResponse() {
+ // normal response1
+ NodeAttributeInfo nodeAttributeInfo1 =
+ NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
+ NodeAttributeType.STRING);
+ Set<NodeAttributeInfo> attributes1 = new HashSet<>();
+ attributes1.add(nodeAttributeInfo1);
+ GetClusterNodeAttributesResponse response1 =
+ GetClusterNodeAttributesResponse.newInstance(attributes1);
+
+ // normal response2
+ NodeAttributeInfo nodeAttributeInfo2 =
+ NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("CPU"),
+ NodeAttributeType.STRING);
+ Set<NodeAttributeInfo> attributes2 = new HashSet<>();
+ attributes2.add(nodeAttributeInfo2);
+ GetClusterNodeAttributesResponse response2 =
+ GetClusterNodeAttributesResponse.newInstance(attributes2);
+
+ // empty response3
+ GetClusterNodeAttributesResponse response3 =
+ GetClusterNodeAttributesResponse.newInstance(new HashSet<>());
+
+ // null response4
+ GetClusterNodeAttributesResponse response4 = null;
+
+ List<GetClusterNodeAttributesResponse> responses = new ArrayList<>();
+ responses.add(response1);
+ responses.add(response2);
+ responses.add(response3);
+ responses.add(response4);
+
+ GetClusterNodeAttributesResponse response =
+ RouterYarnClientUtils.mergeClusterNodeAttributesResponse(responses);
+
+ Assert.assertNotNull(response);
+
+ Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes();
+ Assert.assertEquals(2, nodeAttributeInfos.size());
+ Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1));
+ Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2));
+ }
+
+ @Test
+ public void testMergeNodesToAttributesResponse() {
+ // normal response1
+ NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+ NodeAttributeType.STRING, "nvida");
+ NodeAttribute os = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
+ NodeAttributeType.STRING, "windows64");
+ NodeAttribute dist = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
+ NodeAttributeType.STRING, "3_0_2");
+ Map<String, Set<NodeAttribute>> node1Map = new HashMap<>();
+ node1Map.put("node1", ImmutableSet.of(gpu, os, dist));
+ GetNodesToAttributesResponse response1 = GetNodesToAttributesResponse.newInstance(node1Map);
+
+ // normal response2
+ NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
+ NodeAttributeType.STRING, "docker0");
+ Map<String, Set<NodeAttribute>> node2Map = new HashMap<>();
+ node2Map.put("node2", ImmutableSet.of(docker));
+ GetNodesToAttributesResponse response2 = GetNodesToAttributesResponse.newInstance(node2Map);
+
+ // empty response3
+ GetNodesToAttributesResponse response3 =
+ GetNodesToAttributesResponse.newInstance(new HashMap<>());
+
+ // null response4
+ GetNodesToAttributesResponse response4 = null;
+
+ List<GetNodesToAttributesResponse> responses = new ArrayList<>();
+ responses.add(response1);
+ responses.add(response2);
+ responses.add(response3);
+ responses.add(response4);
+
+ GetNodesToAttributesResponse response =
+ RouterYarnClientUtils.mergeNodesToAttributesResponse(responses);
+
+ Assert.assertNotNull(response);
+
+ Map<String, Set<NodeAttribute>> hostToAttrs = response.getNodeToAttributes();
+ Assert.assertNotNull(hostToAttrs);
+ Assert.assertEquals(2, hostToAttrs.size());
+ Assert.assertTrue(hostToAttrs.get("node1").contains(dist));
+ Assert.assertTrue(hostToAttrs.get("node1").contains(gpu));
+ Assert.assertTrue(hostToAttrs.get("node1").contains(os));
+ Assert.assertTrue(hostToAttrs.get("node2").contains(docker));
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index af1f459..7c82476 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -24,12 +24,19 @@
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -82,6 +89,7 @@
}
mockRMs.put(subClusterId, mockRM);
}
+ initNodeAttributes(subClusterId, mockRM);
return mockRM.getClientRMService();
}
}
@@ -127,4 +135,30 @@
public ConcurrentHashMap<SubClusterId, MockNM> getMockNMs() {
return mockNMs;
}
+
+ private void initNodeAttributes(SubClusterId subClusterId, MockRM mockRM) {
+ String node1 = subClusterId.getId() +"-host1";
+ String node2 = subClusterId.getId() +"-host2";
+ NodeAttributesManager mgr = mockRM.getRMContext().getNodeAttributesManager();
+ NodeAttribute gpu =
+ NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+ NodeAttributeType.STRING, "nvidia");
+ NodeAttribute os =
+ NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
+ NodeAttributeType.STRING, "windows64");
+ NodeAttribute docker =
+ NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
+ NodeAttributeType.STRING, "docker0");
+ NodeAttribute dist =
+ NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
+ NodeAttributeType.STRING, "3_0_2");
+ Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
+ nodes.put(node1, ImmutableSet.of(gpu, os, dist));
+ nodes.put(node2, ImmutableSet.of(docker, dist));
+ try {
+ mgr.addNodeAttributes(nodes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}