YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation (#4610)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 42a2260..d6ce729 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -91,6 +91,12 @@
   private MutableGaugeInt numGetResourceProfilesFailedRetrieved;
   @Metric("# of getResourceProfile failed to be retrieved")
   private MutableGaugeInt numGetResourceProfileFailedRetrieved;
+  @Metric("# of getAttributesToNodes failed to be retrieved")
+  private MutableGaugeInt numGetAttributesToNodesFailedRetrieved;
+  @Metric("# of getClusterNodeAttributes failed to be retrieved")
+  private MutableGaugeInt numGetClusterNodeAttributesFailedRetrieved;
+  @Metric("# of getNodesToAttributes failed to be retrieved")
+  private MutableGaugeInt numGetNodesToAttributesFailedRetrieved;
 
   // Aggregate metrics are shared, and don't have to be looked up per call
   @Metric("Total number of successful Submitted apps and latency(ms)")
@@ -101,14 +107,11 @@
   private MutableRate totalSucceededAppsCreated;
   @Metric("Total number of successful Retrieved app reports and latency(ms)")
   private MutableRate totalSucceededAppsRetrieved;
-  @Metric("Total number of successful Retrieved multiple apps reports and "
-      + "latency(ms)")
+  @Metric("Total number of successful Retrieved multiple apps reports and latency(ms)")
   private MutableRate totalSucceededMultipleAppsRetrieved;
-  @Metric("Total number of successful Retrieved " +
-          "appAttempt reports and latency(ms)")
+  @Metric("Total number of successful Retrieved appAttempt reports and latency(ms)")
   private MutableRate totalSucceededAppAttemptsRetrieved;
-  @Metric("Total number of successful Retrieved getClusterMetrics and "
-      + "latency(ms)")
+  @Metric("Total number of successful Retrieved getClusterMetrics and latency(ms)")
   private MutableRate totalSucceededGetClusterMetricsRetrieved;
   @Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
   private MutableRate totalSucceededGetClusterNodesRetrieved;
@@ -144,9 +147,14 @@
   private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;
   @Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)")
   private MutableRate totalSucceededGetResourceProfilesRetrieved;
-
   @Metric("Total number of successful Retrieved getResourceProfile and latency(ms)")
   private MutableRate totalSucceededGetResourceProfileRetrieved;
+  @Metric("Total number of successful Retrieved getAttributesToNodes and latency(ms)")
+  private MutableRate totalSucceededGetAttributesToNodesRetrieved;
+  @Metric("Total number of successful Retrieved getClusterNodeAttributes and latency(ms)")
+  private MutableRate totalSucceededGetClusterNodeAttributesRetrieved;
+  @Metric("Total number of successful Retrieved getNodesToAttributes and latency(ms)")
+  private MutableRate totalSucceededGetNodesToAttributesRetrieved;
 
   /**
    * Provide quantile counters for all latencies.
@@ -176,6 +184,10 @@
   private MutableQuantiles moveApplicationAcrossQueuesLatency;
   private MutableQuantiles getResourceProfilesLatency;
   private MutableQuantiles getResourceProfileLatency;
+  private MutableQuantiles getAttributesToNodesLatency;
+  private MutableQuantiles getClusterNodeAttributesLatency;
+
+  private MutableQuantiles getNodesToAttributesLatency;
 
   private static volatile RouterMetrics instance = null;
   private static MetricsRegistry registry;
@@ -274,6 +286,18 @@
     getResourceProfileLatency =
         registry.newQuantiles("getResourceProfileLatency",
             "latency of get resource profile timeouts", "ops", "latency", 10);
+
+    getAttributesToNodesLatency =
+        registry.newQuantiles("getAttributesToNodesLatency",
+            "latency of get attributes to nodes timeouts", "ops", "latency", 10);
+
+    getClusterNodeAttributesLatency =
+        registry.newQuantiles("getClusterNodeAttributesLatency",
+            "latency of get cluster node attributes timeouts", "ops", "latency", 10);
+
+    getNodesToAttributesLatency =
+        registry.newQuantiles("getNodesToAttributesLatency",
+            "latency of get nodes to attributes timeouts", "ops", "latency", 10);
   }
 
   public static RouterMetrics getMetrics() {
@@ -421,6 +445,21 @@
   }
 
   @VisibleForTesting
+  public long getNumSucceededGetAttributesToNodesRetrieved() {
+    return totalSucceededGetAttributesToNodesRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededGetClusterNodeAttributesRetrieved() {
+    return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededGetNodesToAttributesRetrieved() {
+    return totalSucceededGetNodesToAttributesRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
   public double getLatencySucceededAppsCreated() {
     return totalSucceededAppsCreated.lastStat().mean();
   }
@@ -546,6 +585,21 @@
   }
 
   @VisibleForTesting
+  public double getLatencySucceededGetAttributesToNodesRetrieved() {
+    return totalSucceededGetAttributesToNodesRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededGetClusterNodeAttributesRetrieved() {
+    return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededGetNodesToAttributesRetrieved() {
+    return totalSucceededGetNodesToAttributesRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
   public int getAppsFailedCreated() {
     return numAppsFailedCreated.value();
   }
@@ -666,6 +720,18 @@
     return numGetResourceProfileFailedRetrieved.value();
   }
 
+  public int getAttributesToNodesFailedRetrieved() {
+    return numGetAttributesToNodesFailedRetrieved.value();
+  }
+
+  public int getClusterNodeAttributesFailedRetrieved() {
+    return numGetClusterNodeAttributesFailedRetrieved.value();
+  }
+
+  public int getNodesToAttributesFailedRetrieved() {
+    return numGetNodesToAttributesFailedRetrieved.value();
+  }
+
   public void succeededAppsCreated(long duration) {
     totalSucceededAppsCreated.add(duration);
     getNewApplicationLatency.add(duration);
@@ -791,6 +857,21 @@
     getResourceProfileLatency.add(duration);
   }
 
+  public void succeededGetAttributesToNodesRetrieved(long duration) {
+    totalSucceededGetAttributesToNodesRetrieved.add(duration);
+    getAttributesToNodesLatency.add(duration);
+  }
+
+  public void succeededGetClusterNodeAttributesRetrieved(long duration) {
+    totalSucceededGetClusterNodeAttributesRetrieved.add(duration);
+    getClusterNodeAttributesLatency.add(duration);
+  }
+
+  public void succeededGetNodesToAttributesRetrieved(long duration) {
+    totalSucceededGetNodesToAttributesRetrieved.add(duration);
+    getNodesToAttributesLatency.add(duration);
+  }
+
   public void incrAppsFailedCreated() {
     numAppsFailedCreated.incr();
   }
@@ -890,4 +971,16 @@
   public void incrGetResourceProfileFailedRetrieved() {
     numGetResourceProfileFailedRetrieved.incr();
   }
+
+  public void incrGetAttributesToNodesFailedRetrieved() {
+    numGetAttributesToNodesFailedRetrieved.incr();
+  }
+
+  public void incrGetClusterNodeAttributesFailedRetrieved() {
+    numGetClusterNodeAttributesFailedRetrieved.incr();
+  }
+
+  public void incrGetNodesToAttributesFailedRetrieved() {
+    numGetNodesToAttributesFailedRetrieved.incr();
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 45cec64..7fd1003 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -175,7 +175,6 @@
     federationFacade = FederationStateStoreFacade.getInstance();
     rand = new Random(System.currentTimeMillis());
 
-
     int numThreads = getConf().getInt(
         YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
         YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
@@ -195,12 +194,11 @@
       LOG.error(e.getMessage());
     }
 
-    numSubmitRetries =
-        conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
-            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
+    numSubmitRetries = conf.getInt(
+        YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
+        YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
 
-    clientRMProxies =
-        new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
+    clientRMProxies = new ConcurrentHashMap<>();
     routerMetrics = RouterMetrics.getMetrics();
 
     returnPartialReport = conf.getBoolean(
@@ -227,19 +225,17 @@
     ApplicationClientProtocol clientRMProxy = null;
     try {
       boolean serviceAuthEnabled = getConf().getBoolean(
-              CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
+          CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
       UserGroupInformation realUser = user;
       if (serviceAuthEnabled) {
-        realUser = UserGroupInformation.createProxyUser(
-                user.getShortUserName(), UserGroupInformation.getLoginUser());
+        realUser = UserGroupInformation.createProxyUser(user.getShortUserName(),
+            UserGroupInformation.getLoginUser());
       }
       clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
           ApplicationClientProtocol.class, subClusterId, realUser);
     } catch (Exception e) {
       RouterServerUtil.logAndThrowException(
-          "Unable to create the interface to reach the SubCluster "
-              + subClusterId,
-          e);
+          "Unable to create the interface to reach the SubCluster " + subClusterId, e);
     }
 
     clientRMProxies.put(subClusterId, clientRMProxy);
@@ -287,8 +283,7 @@
 
     for (int i = 0; i < numSubmitRetries; ++i) {
       SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
-      LOG.debug(
-          "getNewApplication try #{} on SubCluster {}", i, subClusterId);
+      LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
       ApplicationClientProtocol clientRMProxy =
           getClientRMProxyForSubCluster(subClusterId);
       GetNewApplicationResponse response = null;
@@ -410,7 +405,7 @@
     ApplicationId applicationId =
         request.getApplicationSubmissionContext().getApplicationId();
 
-    List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
+    List<SubClusterId> blacklist = new ArrayList<>();
 
     for (int i = 0; i < numSubmitRetries; ++i) {
 
@@ -561,8 +556,8 @@
     }
 
     if (response == null) {
-      LOG.error("No response when attempting to kill the application "
-          + applicationId + " to SubCluster " + subClusterId.getId());
+      LOG.error("No response when attempting to kill the application {} to SubCluster {}.",
+          applicationId, subClusterId.getId());
     }
 
     long stopTime = clock.getTime();
@@ -1015,7 +1010,7 @@
     }
     long startTime = clock.getTime();
     ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes",
-         new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
+        new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
     Collection<GetLabelsToNodesResponse> labelNodes;
     try {
       labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
@@ -1040,7 +1035,7 @@
     }
     long startTime = clock.getTime();
     ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels",
-         new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
+        new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
     Collection<GetClusterNodeLabelsResponse> nodeLabels;
     try {
       nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
@@ -1528,20 +1523,75 @@
   @Override
   public GetAttributesToNodesResponse getAttributesToNodes(
       GetAttributesToNodesRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    if (request == null || request.getNodeAttributes() == null) {
+      routerMetrics.incrGetAttributesToNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing getAttributesToNodes request " +
+          "or nodeAttributes.", null);
+    }
+    long startTime = clock.getTime();
+    ClientMethod remoteMethod = new ClientMethod("getAttributesToNodes",
+        new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request});
+    Collection<GetAttributesToNodesResponse> attributesToNodesResponses = null;
+    try {
+      attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
+          GetAttributesToNodesResponse.class);
+    } catch (Exception ex) {
+      routerMetrics.incrGetAttributesToNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.",
+          ex);
+    }
+    long stopTime = clock.getTime();
+    routerMetrics.succeededGetAttributesToNodesRetrieved(stopTime - startTime);
+    return RouterYarnClientUtils.mergeAttributesToNodesResponse(attributesToNodesResponses);
   }
 
   @Override
   public GetClusterNodeAttributesResponse getClusterNodeAttributes(
-      GetClusterNodeAttributesRequest request)
-      throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+      GetClusterNodeAttributesRequest request) throws YarnException, IOException {
+    if (request == null) {
+      routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing getClusterNodeAttributes request.", null);
+    }
+    long startTime = clock.getTime();
+    ClientMethod remoteMethod = new ClientMethod("getClusterNodeAttributes",
+        new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request});
+    Collection<GetClusterNodeAttributesResponse> clusterNodeAttributesResponses = null;
+    try {
+      clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
+          GetClusterNodeAttributesResponse.class);
+    } catch (Exception ex) {
+      routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to get cluster node attributes due " +
+          " to exception.", ex);
+    }
+    long stopTime = clock.getTime();
+    routerMetrics.succeededGetClusterNodeAttributesRetrieved(stopTime - startTime);
+    return RouterYarnClientUtils.mergeClusterNodeAttributesResponse(clusterNodeAttributesResponses);
   }
 
   @Override
   public GetNodesToAttributesResponse getNodesToAttributes(
       GetNodesToAttributesRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    if (request == null || request.getHostNames() == null) {
+      routerMetrics.incrGetNodesToAttributesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing getNodesToAttributes request or " +
+          "hostNames.", null);
+    }
+    long startTime = clock.getTime();
+    ClientMethod remoteMethod = new ClientMethod("getNodesToAttributes",
+        new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request});
+    Collection<GetNodesToAttributesResponse> nodesToAttributesResponses = null;
+    try {
+      nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
+          GetNodesToAttributesResponse.class);
+    } catch (Exception ex) {
+      routerMetrics.incrGetNodesToAttributesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to get nodes to attributes due " +
+          " to exception.", ex);
+    }
+    long stopTime = clock.getTime();
+    routerMetrics.succeededGetNodesToAttributesRetrieved(stopTime - startTime);
+    return RouterYarnClientUtils.mergeNodesToAttributesResponse(nodesToAttributesResponses);
   }
 
   protected SubClusterId getApplicationHomeSubCluster(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
index d72e72a..e70d552 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
@@ -37,6 +37,9 @@
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -49,9 +52,12 @@
 import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
+import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
 import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -468,5 +474,56 @@
     profileResponse.setResource(resource);
     return profileResponse;
   }
+
+  /**
+   * Merges a list of GetAttributesToNodesResponse.
+   *
+   * @param responses a list of GetAttributesToNodesResponse to merge.
+   * @return the merged GetAttributesToNodesResponse.
+   */
+  public static GetAttributesToNodesResponse mergeAttributesToNodesResponse(
+      Collection<GetAttributesToNodesResponse> responses) {
+    Map<NodeAttributeKey, List<NodeToAttributeValue>> nodeAttributeMap = new HashMap<>();
+    for (GetAttributesToNodesResponse response : responses) {
+      if (response != null && response.getAttributesToNodes() != null) {
+        nodeAttributeMap.putAll(response.getAttributesToNodes());
+      }
+    }
+    return GetAttributesToNodesResponse.newInstance(nodeAttributeMap);
+  }
+
+  /**
+   * Merges a list of GetClusterNodeAttributesResponse.
+   *
+   * @param responses a list of GetClusterNodeAttributesResponse to merge.
+   * @return the merged GetClusterNodeAttributesResponse.
+   */
+  public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesResponse(
+      Collection<GetClusterNodeAttributesResponse> responses) {
+    Set<NodeAttributeInfo> nodeAttributeInfo = new HashSet<>();
+    for (GetClusterNodeAttributesResponse response : responses) {
+      if (response != null && response.getNodeAttributes() != null) {
+        nodeAttributeInfo.addAll(response.getNodeAttributes());
+      }
+    }
+    return GetClusterNodeAttributesResponse.newInstance(nodeAttributeInfo);
+  }
+
+  /**
+   * Merges a list of GetNodesToAttributesResponse.
+   *
+   * @param responses a list of GetNodesToAttributesResponse to merge.
+   * @return the merged GetNodesToAttributesResponse.
+   */
+  public static GetNodesToAttributesResponse mergeNodesToAttributesResponse(
+      Collection<GetNodesToAttributesResponse> responses) {
+    Map<String, Set<NodeAttribute>> attributesMap = new HashMap<>();
+    for (GetNodesToAttributesResponse response : responses) {
+      if (response != null && response.getNodeToAttributes() != null) {
+        attributesMap.putAll(response.getNodeToAttributes());
+      }
+    }
+    return GetNodesToAttributesResponse.newInstance(attributesMap);
+  }
 }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 61fcd53..455cb22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -438,6 +438,21 @@
       LOG.info("Mocked: failed getResourceProfileFailed call");
       metrics.incrGetResourceProfileFailedRetrieved();
     }
+
+    public void getAttributesToNodesFailed() {
+      LOG.info("Mocked: failed getAttributesToNodesFailed call");
+      metrics.incrGetAttributesToNodesFailedRetrieved();
+    }
+
+    public void getClusterNodeAttributesFailed() {
+      LOG.info("Mocked: failed getClusterNodeAttributesFailed call");
+      metrics.incrGetClusterNodeAttributesFailedRetrieved();
+    }
+
+    public void getNodesToAttributesFailed() {
+      LOG.info("Mocked: failed getNodesToAttributesFailed call");
+      metrics.incrGetNodesToAttributesFailedRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -573,6 +588,21 @@
       LOG.info("Mocked: successful getResourceProfile call with duration {}", duration);
       metrics.succeededGetResourceProfileRetrieved(duration);
     }
+
+    public void getAttributesToNodesRetrieved(long duration) {
+      LOG.info("Mocked: successful getAttributesToNodes call with duration {}", duration);
+      metrics.succeededGetAttributesToNodesRetrieved(duration);
+    }
+
+    public void getClusterNodeAttributesRetrieved(long duration) {
+      LOG.info("Mocked: successful getClusterNodeAttributes call with duration {}", duration);
+      metrics.succeededGetClusterNodeAttributesRetrieved(duration);
+    }
+
+    public void getNodesToAttributesRetrieved(long duration) {
+      LOG.info("Mocked: successful getNodesToAttributes call with duration {}", duration);
+      metrics.succeededGetNodesToAttributesRetrieved(duration);
+    }
   }
 
   @Test
@@ -970,4 +1000,73 @@
     Assert.assertEquals(totalBadBefore + 1,
         metrics.getResourceProfileFailedRetrieved());
   }
+
+  @Test
+  public void testSucceededGetAttributesToNodesRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededGetAttributesToNodesRetrieved();
+    goodSubCluster.getAttributesToNodesRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededGetAttributesToNodesRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getAttributesToNodesRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededGetAttributesToNodesRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testGetAttributesToNodesRetrievedFailed() {
+    long totalBadBefore = metrics.getAttributesToNodesFailedRetrieved();
+    badSubCluster.getAttributesToNodesFailed();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getAttributesToNodesFailedRetrieved());
+  }
+
+  @Test
+  public void testGetClusterNodeAttributesRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededGetClusterNodeAttributesRetrieved();
+    goodSubCluster.getClusterNodeAttributesRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededGetClusterNodeAttributesRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getClusterNodeAttributesRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededGetClusterNodeAttributesRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testGetClusterNodeAttributesRetrievedFailed() {
+    long totalBadBefore = metrics.getClusterNodeAttributesFailedRetrieved();
+    badSubCluster.getClusterNodeAttributesFailed();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getClusterNodeAttributesFailedRetrieved());
+  }
+
+  @Test
+  public void testGetNodesToAttributesRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededGetNodesToAttributesRetrieved();
+    goodSubCluster.getNodesToAttributesRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededGetNodesToAttributesRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getNodesToAttributesRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededGetNodesToAttributesRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testGetNodesToAttributesRetrievedFailed() {
+    long totalBadBefore = metrics.getNodesToAttributesFailedRetrieved();
+    badSubCluster.getNodesToAttributesFailed();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getNodesToAttributesFailedRetrieved());
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 49872e5..f0aa480 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -82,6 +82,12 @@
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -96,6 +102,11 @@
 import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
+import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -1233,4 +1244,81 @@
     Assert.assertEquals(4096, response3.getResource().getMemorySize());
     Assert.assertEquals(4, response3.getResource().getVirtualCores());
   }
+
+  @Test
+  public void testGetAttributesToNodes() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get AttributesToNodes request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class, "Missing getAttributesToNodes request " +
+        "or nodeAttributes.", () -> interceptor.getAttributesToNodes(null));
+
+    // normal request
+    GetAttributesToNodesResponse response =
+        interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance());
+
+    Assert.assertNotNull(response);
+    Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes();
+    Assert.assertNotNull(attrs);
+    Assert.assertEquals(4, attrs.size());
+
+    NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+        NodeAttributeType.STRING, "nvidia");
+    NodeToAttributeValue attributeValue1 =
+        NodeToAttributeValue.newInstance("0-host1", gpu.getAttributeValue());
+    NodeAttributeKey gpuKey = gpu.getAttributeKey();
+    Assert.assertTrue(attrs.get(gpuKey).contains(attributeValue1));
+  }
+
+  @Test
+  public void testClusterNodeAttributes() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get ClusterNodeAttributes request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeAttributes request.",
+        () -> interceptor.getClusterNodeAttributes(null));
+
+    // normal request
+    GetClusterNodeAttributesResponse response =
+        interceptor.getClusterNodeAttributes(GetClusterNodeAttributesRequest.newInstance());
+
+    Assert.assertNotNull(response);
+    Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes();
+    Assert.assertNotNull(nodeAttributeInfos);
+    Assert.assertEquals(4, nodeAttributeInfos.size());
+
+    NodeAttributeInfo nodeAttributeInfo1 =
+        NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
+        NodeAttributeType.STRING);
+    Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1));
+
+    NodeAttributeInfo nodeAttributeInfo2 =
+        NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("OS"),
+        NodeAttributeType.STRING);
+    Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2));
+  }
+
+  @Test
+  public void testNodesToAttributes() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get NodesToAttributes request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getNodesToAttributes request or hostNames.",
+        () -> interceptor.getNodesToAttributes(null));
+
+    // normal request
+    Set<String> hostNames = Collections.singleton("0-host1");
+    GetNodesToAttributesResponse response =
+        interceptor.getNodesToAttributes(GetNodesToAttributesRequest.newInstance(hostNames));
+    Assert.assertNotNull(response);
+
+    Map<String, Set<NodeAttribute>> nodeAttributeMap = response.getNodeToAttributes();
+    Assert.assertNotNull(nodeAttributeMap);
+    Assert.assertEquals(1, nodeAttributeMap.size());
+
+    NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+        NodeAttributeType.STRING, "nvida");
+    Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java
index d586c48..33cae61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java
@@ -37,6 +37,9 @@
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -53,6 +56,11 @@
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
+import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
+import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
 import org.junit.Assert;
@@ -610,4 +618,147 @@
     Assert.assertEquals(3, resource.getVirtualCores());
     Assert.assertEquals(3072, resource.getMemorySize());
   }
+
+  @Test
+  public void testMergeAttributesToNodesResponse() {
+    // normal response1
+    NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+        NodeAttributeType.STRING, "nvidia");
+    Map<NodeAttributeKey, List<NodeToAttributeValue>> map1 = new HashMap<>();
+    List<NodeToAttributeValue> lists1 = new ArrayList<>();
+    NodeToAttributeValue attributeValue1 =
+        NodeToAttributeValue.newInstance("node1", gpu.getAttributeValue());
+    lists1.add(attributeValue1);
+    map1.put(gpu.getAttributeKey(), lists1);
+    GetAttributesToNodesResponse response1 = GetAttributesToNodesResponse.newInstance(map1);
+
+    // normal response2
+    NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
+        NodeAttributeType.STRING, "docker0");
+    Map<NodeAttributeKey, List<NodeToAttributeValue>> map2 = new HashMap<>();
+    List<NodeToAttributeValue> lists2 = new ArrayList<>();
+    NodeToAttributeValue attributeValue2 =
+        NodeToAttributeValue.newInstance("node2", docker.getAttributeValue());
+    lists2.add(attributeValue2);
+    map2.put(docker.getAttributeKey(), lists2);
+    GetAttributesToNodesResponse response2 = GetAttributesToNodesResponse.newInstance(map2);
+
+    // empty response3
+    GetAttributesToNodesResponse response3 =
+        GetAttributesToNodesResponse.newInstance(new HashMap<>());
+
+    // null response4
+    GetAttributesToNodesResponse response4 = null;
+
+    List<GetAttributesToNodesResponse> responses = new ArrayList<>();
+    responses.add(response1);
+    responses.add(response2);
+    responses.add(response3);
+    responses.add(response4);
+
+    GetAttributesToNodesResponse response =
+        RouterYarnClientUtils.mergeAttributesToNodesResponse(responses);
+
+    Assert.assertNotNull(response);
+    Assert.assertEquals(2, response.getAttributesToNodes().size());
+
+    Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes();
+
+    NodeAttributeKey gpuKey = gpu.getAttributeKey();
+    Assert.assertEquals(attributeValue1.toString(), attrs.get(gpuKey).get(0).toString());
+
+    NodeAttributeKey dockerKey = docker.getAttributeKey();
+    Assert.assertEquals(attributeValue2.toString(), attrs.get(dockerKey).get(0).toString());
+  }
+
+  @Test
+  public void testMergeClusterNodeAttributesResponse() {
+    // normal response1
+    NodeAttributeInfo nodeAttributeInfo1 =
+        NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
+        NodeAttributeType.STRING);
+    Set<NodeAttributeInfo> attributes1 = new HashSet<>();
+    attributes1.add(nodeAttributeInfo1);
+    GetClusterNodeAttributesResponse response1 =
+        GetClusterNodeAttributesResponse.newInstance(attributes1);
+
+    // normal response2
+    NodeAttributeInfo nodeAttributeInfo2 =
+        NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("CPU"),
+        NodeAttributeType.STRING);
+    Set<NodeAttributeInfo> attributes2 = new HashSet<>();
+    attributes2.add(nodeAttributeInfo2);
+    GetClusterNodeAttributesResponse response2 =
+        GetClusterNodeAttributesResponse.newInstance(attributes2);
+
+    // empty response3
+    GetClusterNodeAttributesResponse response3 =
+        GetClusterNodeAttributesResponse.newInstance(new HashSet<>());
+
+    // null response4
+    GetClusterNodeAttributesResponse response4 = null;
+
+    List<GetClusterNodeAttributesResponse> responses = new ArrayList<>();
+    responses.add(response1);
+    responses.add(response2);
+    responses.add(response3);
+    responses.add(response4);
+
+    GetClusterNodeAttributesResponse response =
+        RouterYarnClientUtils.mergeClusterNodeAttributesResponse(responses);
+
+    Assert.assertNotNull(response);
+
+    Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes();
+    Assert.assertEquals(2, nodeAttributeInfos.size());
+    Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1));
+    Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2));
+  }
+
+  @Test
+  public void testMergeNodesToAttributesResponse() {
+    // normal response1
+    NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+        NodeAttributeType.STRING, "nvida");
+    NodeAttribute os = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
+        NodeAttributeType.STRING, "windows64");
+    NodeAttribute dist = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
+        NodeAttributeType.STRING, "3_0_2");
+    Map<String, Set<NodeAttribute>> node1Map = new HashMap<>();
+    node1Map.put("node1", ImmutableSet.of(gpu, os, dist));
+    GetNodesToAttributesResponse response1 = GetNodesToAttributesResponse.newInstance(node1Map);
+
+    // normal response2
+    NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
+        NodeAttributeType.STRING, "docker0");
+    Map<String, Set<NodeAttribute>> node2Map = new HashMap<>();
+    node2Map.put("node2", ImmutableSet.of(docker));
+    GetNodesToAttributesResponse response2 = GetNodesToAttributesResponse.newInstance(node2Map);
+
+    // empty response3
+    GetNodesToAttributesResponse response3 =
+        GetNodesToAttributesResponse.newInstance(new HashMap<>());
+
+    // null response4
+    GetNodesToAttributesResponse response4 = null;
+
+    List<GetNodesToAttributesResponse> responses = new ArrayList<>();
+    responses.add(response1);
+    responses.add(response2);
+    responses.add(response3);
+    responses.add(response4);
+
+    GetNodesToAttributesResponse response =
+        RouterYarnClientUtils.mergeNodesToAttributesResponse(responses);
+
+    Assert.assertNotNull(response);
+
+    Map<String, Set<NodeAttribute>> hostToAttrs = response.getNodeToAttributes();
+    Assert.assertNotNull(hostToAttrs);
+    Assert.assertEquals(2, hostToAttrs.size());
+    Assert.assertTrue(hostToAttrs.get("node1").contains(dist));
+    Assert.assertTrue(hostToAttrs.get("node1").contains(gpu));
+    Assert.assertTrue(hostToAttrs.get("node1").contains(os));
+    Assert.assertTrue(hostToAttrs.get("node2").contains(docker));
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index af1f459..7c82476 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -24,12 +24,19 @@
 import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -82,6 +89,7 @@
         }
         mockRMs.put(subClusterId, mockRM);
       }
+      initNodeAttributes(subClusterId, mockRM);
       return mockRM.getClientRMService();
     }
   }
@@ -127,4 +135,30 @@
   public ConcurrentHashMap<SubClusterId, MockNM> getMockNMs() {
     return mockNMs;
   }
+
+  private void initNodeAttributes(SubClusterId subClusterId, MockRM mockRM)  {
+    String node1 = subClusterId.getId() +"-host1";
+    String node2 = subClusterId.getId() +"-host2";
+    NodeAttributesManager mgr = mockRM.getRMContext().getNodeAttributesManager();
+    NodeAttribute gpu =
+        NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
+        NodeAttributeType.STRING, "nvidia");
+    NodeAttribute os =
+        NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
+        NodeAttributeType.STRING, "windows64");
+    NodeAttribute docker =
+        NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
+        NodeAttributeType.STRING, "docker0");
+    NodeAttribute dist =
+        NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
+        NodeAttributeType.STRING, "3_0_2");
+    Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
+    nodes.put(node1, ImmutableSet.of(gpu, os, dist));
+    nodes.put(node2, ImmutableSet.of(docker, dist));
+    try {
+      mgr.addNodeAttributes(nodes);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }