YARN-11235. Refactor Policy Code and Define getReservationHomeSubcluster (#4656)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
index 9b795b0..fc8cc5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
@@ -20,7 +20,7 @@
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
 import java.util.Random;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -188,8 +188,8 @@
    * @throws FederationPolicyException if there are no usable subclusters.
    */
   public static void validateSubClusterAvailability(
-      List<SubClusterId> activeSubClusters,
-      List<SubClusterId> blackListSubClusters)
+      Collection<SubClusterId> activeSubClusters,
+      Collection<SubClusterId> blackListSubClusters)
       throws FederationPolicyException {
     if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
       if (blackListSubClusters == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index 0cbda31..c4fc732 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -25,6 +25,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
@@ -136,7 +137,7 @@
 
     if (appSubmissionContext == null) {
       throw new FederationPolicyException(
-          "The ApplicationSubmissionContext " + "cannot be null.");
+          "The ApplicationSubmissionContext cannot be null.");
     }
 
     String queue = appSubmissionContext.getQueue();
@@ -148,51 +149,7 @@
       queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
     }
 
-    // the facade might cache this request, based on its parameterization
-    SubClusterPolicyConfiguration configuration = null;
-
-    try {
-      configuration = federationFacade.getPolicyConfiguration(queue);
-    } catch (YarnException e) {
-      String errMsg = "There is no policy configured for the queue: " + queue
-          + ", falling back to defaults.";
-      LOG.warn(errMsg, e);
-    }
-
-    // If there is no policy configured for this queue, fallback to the baseline
-    // policy that is configured either in the store or via XML config (and
-    // cached)
-    if (configuration == null) {
-      LOG.warn("There is no policies configured for queue: " + queue + " we"
-          + " fallback to default policy for: "
-          + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
-
-      queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
-      try {
-        configuration = federationFacade.getPolicyConfiguration(queue);
-      } catch (YarnException e) {
-        String errMsg = "Cannot retrieve policy configured for the queue: "
-            + queue + ", falling back to defaults.";
-        LOG.warn(errMsg, e);
-
-      }
-    }
-
-    // the fallback is not configure via store, but via XML, using
-    // previously loaded configuration.
-    if (configuration == null) {
-      configuration =
-          cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
-    }
-
-    // if the configuration has changed since last loaded, reinit the policy
-    // based on current configuration
-    if (!cachedConfs.containsKey(queue)
-        || !cachedConfs.get(queue).equals(configuration)) {
-      singlePolicyReinit(policyMap, cachedConfs, queue, configuration);
-    }
-
-    FederationRouterPolicy policy = policyMap.get(queue);
+    FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue);
     if (policy == null) {
       // this should never happen, as the to maps are updated together
       throw new FederationPolicyException("No FederationRouterPolicy found "
@@ -262,4 +219,92 @@
 
   }
 
+  /**
+   * This method provides a wrapper of all policy functionalities for routing a
+   * reservation. Internally it manages configuration changes, and policy
+   * init/reinit.
+   *
+   * @param request the reservation to route.
+   *
+   * @return the id of the subcluster that will be the "home" for this
+   *         reservation.
+   *
+   * @throws YarnException if there are issues initializing policies, or no
+   *           valid sub-cluster id could be found for this reservation.
+   */
+  public SubClusterId getReservationHomeSubCluster(
+      ReservationSubmissionRequest request) throws YarnException {
+
+    // the maps are concurrent, but we need to protect from reset()
+    // reinitialization mid-execution by creating a new reference local to this
+    // method.
+    Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
+    Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;
+
+    if (request == null) {
+      throw new FederationPolicyException(
+          "The ReservationSubmissionRequest cannot be null.");
+    }
+
+    String queue = request.getQueue();
+    FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue);
+
+    if (policy == null) {
+      // this should never happen, as the to maps are updated together
+      throw new FederationPolicyException("No FederationRouterPolicy found "
+          + "for queue: " + request.getQueue() + " (while routing "
+          + "reservation: " + request.getReservationId() + ") "
+          + "and no default specified.");
+    }
+
+    return policy.getReservationHomeSubcluster(request);
+  }
+
+  private FederationRouterPolicy getFederationRouterPolicy(
+      Map<String, SubClusterPolicyConfiguration> cachedConfiguration,
+      Map<String, FederationRouterPolicy> policyMap, String queue)
+      throws FederationPolicyInitializationException {
+
+    // the facade might cache this request, based on its parameterization
+    SubClusterPolicyConfiguration configuration = null;
+    String copyQueue = queue;
+
+    try {
+      configuration = federationFacade.getPolicyConfiguration(copyQueue);
+    } catch (YarnException e) {
+      LOG.warn("There is no policy configured for the queue: {}, falling back to defaults.",
+          copyQueue, e);
+    }
+
+    // If there is no policy configured for this queue, fallback to the baseline
+    // policy that is configured either in the store or via XML config (and cached)
+    if (configuration == null) {
+      final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+      LOG.warn("There is no policies configured for queue: {} " +
+          "we fallback to default policy for: {}. ", copyQueue, policyKey);
+      copyQueue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+      try {
+        configuration = federationFacade.getPolicyConfiguration(copyQueue);
+      } catch (YarnException e) {
+        LOG.warn("Cannot retrieve policy configured for the queue: {}, falling back to defaults.",
+            copyQueue, e);
+      }
+    }
+
+    // the fallback is not configure via store, but via XML, using
+    // previously loaded configuration.
+    if (configuration == null) {
+      configuration = cachedConfiguration.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+    }
+
+    // if the configuration has changed since last loaded, reinit the policy
+    // based on current configuration
+    SubClusterPolicyConfiguration policyConfiguration =
+        cachedConfiguration.getOrDefault(copyQueue, null);
+    if (policyConfiguration == null || !policyConfiguration.equals(configuration)) {
+      singlePolicyReinit(policyMap, cachedConfiguration, copyQueue, configuration);
+    }
+
+    return policyMap.get(copyQueue);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
index 730fb41..dddc538 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
@@ -18,15 +18,22 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 
 /**
  * Base abstract class for {@link FederationRouterPolicy} implementations, that
@@ -63,4 +70,107 @@
     }
   }
 
+  /**
+   * This method is implemented by the specific policy, and it is used to route
+   * both reservations, and applications among a given set of
+   * sub-clusters.
+   *
+   * @param queue the queue for this application/reservation
+   * @param preSelectSubClusters a pre-filter set of sub-clusters
+   * @return the chosen sub-cluster
+   *
+   * @throws YarnException if the policy fails to choose a sub-cluster
+   */
+  protected abstract SubClusterId chooseSubCluster(String queue,
+      Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException;
+
+  /**
+   * Filter chosen SubCluster based on reservationId.
+   *
+   * @param reservationId the globally unique identifier for a reservation.
+   * @param activeSubClusters the map of ids to info for all active subclusters.
+   * @return the chosen sub-cluster
+   * @throws YarnException if the policy fails to choose a sub-cluster
+   */
+  protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(
+      ReservationId reservationId, Map<SubClusterId, SubClusterInfo> activeSubClusters)
+      throws YarnException {
+
+    // if a reservation exists limit scope to the sub-cluster this
+    // reservation is mapped to
+    // TODO: Implemented in YARN-11236
+    return activeSubClusters;
+  }
+
+  /**
+   * Simply picks from alphabetically-sorted active subclusters based on the
+   * hash of quey name. Jobs of the same queue will all be routed to the same
+   * sub-cluster, as far as the number of active sub-cluster and their names
+   * remain the same.
+   *
+   * @param appContext the {@link ApplicationSubmissionContext} that
+   *          has to be routed to an appropriate subCluster for execution.
+   *
+   * @param blackLists the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
+   *
+   * @return a hash-based chosen {@link SubClusterId} that will be the "home"
+   *         for this application.
+   *
+   * @throws YarnException if there are no active subclusters.
+   */
+  @Override
+  public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext,
+      List<SubClusterId> blackLists) throws YarnException {
+
+    // null checks and default-queue behavior
+    validate(appContext);
+
+    // apply filtering based on reservation location and active sub-clusters
+    Map<SubClusterId, SubClusterInfo> filteredSubClusters = prefilterSubClusters(
+            appContext.getReservationID(), getActiveSubclusters());
+
+    FederationPolicyUtils.validateSubClusterAvailability(filteredSubClusters.keySet(), blackLists);
+
+    // remove black SubCluster
+    if (blackLists != null) {
+      blackLists.forEach(filteredSubClusters::remove);
+    }
+
+    // pick the chosen subCluster from the active ones
+    return chooseSubCluster(appContext.getQueue(), filteredSubClusters);
+  }
+
+  /**
+   * This method provides a wrapper of all policy functionalities for routing a
+   * reservation. Internally it manages configuration changes, and policy
+   * init/reinit.
+   *
+   * @param request the reservation to route.
+   *
+   * @return the id of the subcluster that will be the "home" for this
+   *         reservation.
+   *
+   * @throws YarnException if there are issues initializing policies, or no
+   *           valid sub-cluster id could be found for this reservation.
+   */
+  @Override
+  public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest request)
+      throws YarnException {
+    if (request == null) {
+      throw new FederationPolicyException("The ReservationSubmissionRequest cannot be null.");
+    }
+
+    if (request.getQueue() == null) {
+      request.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    }
+
+    // apply filtering based on reservation location and active sub-clusters
+    Map<SubClusterId, SubClusterInfo> filteredSubClusters = prefilterSubClusters(
+        request.getReservationId(), getActiveSubclusters());
+
+    // pick the chosen subCluster from the active ones
+    return chooseSubCluster(request.getQueue(), filteredSubClusters);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
index 9325bd8..af58106 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
@@ -19,6 +19,7 @@
 
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
@@ -49,4 +50,16 @@
   SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext,
       List<SubClusterId> blackListSubClusters) throws YarnException;
+
+  /**
+   * Determines the sub-cluster where a ReservationSubmissionRequest should be
+   * sent to.
+   *
+   * @param request the original request
+   * @return a mapping of sub-clusters and the requests
+   *
+   * @throws YarnException if the policy fails to choose a sub-cluster
+   */
+  SubClusterId getReservationHomeSubcluster(
+      ReservationSubmissionRequest request) throws YarnException;
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
index cc11880..5ac2d1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
@@ -22,11 +22,9 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -50,53 +48,12 @@
     setPolicyContext(federationPolicyContext);
   }
 
-  /**
-   * Simply picks from alphabetically-sorted active subclusters based on the
-   * hash of quey name. Jobs of the same queue will all be routed to the same
-   * sub-cluster, as far as the number of active sub-cluster and their names
-   * remain the same.
-   *
-   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
-   *          has to be routed to an appropriate subCluster for execution.
-   *
-   * @param blackListSubClusters the list of subClusters as identified by
-   *          {@link SubClusterId} to blackList from the selection of the home
-   *          subCluster.
-   *
-   * @return a hash-based chosen {@link SubClusterId} that will be the "home"
-   *         for this application.
-   *
-   * @throws YarnException if there are no active subclusters.
-   */
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blackListSubClusters) throws YarnException {
-
-    // throws if no active subclusters available
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
-
-    FederationPolicyUtils.validateSubClusterAvailability(
-        new ArrayList<SubClusterId>(activeSubclusters.keySet()),
-        blackListSubClusters);
-
-    if (blackListSubClusters != null) {
-
-      // Remove from the active SubClusters from StateStore the blacklisted ones
-      for (SubClusterId scId : blackListSubClusters) {
-        activeSubclusters.remove(scId);
-      }
-    }
-
-    validate(appSubmissionContext);
-
-    int chosenPosition = Math.abs(
-        appSubmissionContext.getQueue().hashCode() % activeSubclusters.size());
-
-    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+  protected SubClusterId chooseSubCluster(String queue,
+      Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
+    int chosenPosition = Math.abs(queue.hashCode() % preSelectSubclusters.size());
+    List<SubClusterId> list = new ArrayList<>(preSelectSubclusters.keySet());
     Collections.sort(list);
     return list.get(chosenPosition);
   }
-
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index fa5eb4b..a86a43a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -17,14 +17,10 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
@@ -65,28 +61,12 @@
   }
 
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blacklist) throws YarnException {
-
-    // null checks and default-queue behavior
-    validate(appSubmissionContext);
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
-
-    FederationPolicyUtils.validateSubClusterAvailability(
-        new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
-
-    Map<SubClusterIdInfo, Float> weights =
-        getPolicyInfo().getRouterPolicyWeights();
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
+    Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights();
     SubClusterIdInfo chosen = null;
     long currBestMem = -1;
-    for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters
-        .entrySet()) {
-      if (blacklist != null && blacklist.contains(entry.getKey())) {
-        continue;
-      }
+    for (Map.Entry<SubClusterId, SubClusterInfo> entry : preSelectSubclusters.entrySet()) {
       SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
       if (weights.containsKey(id) && weights.get(id) > 0) {
         long availableMemory = getAvailableMemory(entry.getValue());
@@ -110,7 +90,7 @@
       mem = obj.getJSONObject("clusterMetrics").getLong("availableMB");
       return mem;
     } catch (JSONException j) {
-      throw new YarnException("FederationSubCluserInfo cannot be parsed", j);
+      throw new YarnException("FederationSubClusterInfo cannot be parsed", j);
     }
   }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java
index 469240a..3abcf6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Collections;
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -78,7 +79,7 @@
     resolver = policyContext.getFederationSubclusterResolver();
     Map<SubClusterIdInfo, Float> weights =
         getPolicyInfo().getRouterPolicyWeights();
-    enabledSCs = new ArrayList<SubClusterId>();
+    enabledSCs = new ArrayList<>();
     for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
       if (entry != null && entry.getValue() > 0) {
         enabledSCs.add(entry.getKey().toId());
@@ -100,8 +101,7 @@
     // Fast path for FailForward to WeightedRandomRouterPolicy
     if (rrList == null || rrList.isEmpty() || (rrList.size() == 1
         && ResourceRequest.isAnyLocation(rrList.get(0).getResourceName()))) {
-      return super
-          .getHomeSubcluster(appSubmissionContext, blackListSubClusters);
+      return super.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
     }
 
     if (rrList.size() != 3) {
@@ -109,12 +109,11 @@
           "Invalid number of resource requests: " + rrList.size());
     }
 
-    Map<SubClusterId, SubClusterInfo> activeSubClusters =
-        getActiveSubclusters();
-    List<SubClusterId> validSubClusters =
-        new ArrayList<>(activeSubClusters.keySet());
-    FederationPolicyUtils
-        .validateSubClusterAvailability(validSubClusters, blackListSubClusters);
+    Map<SubClusterId, SubClusterInfo> activeSubClusters = getActiveSubclusters();
+    Set<SubClusterId> validSubClusters = activeSubClusters.keySet();
+    FederationPolicyUtils.validateSubClusterAvailability(activeSubClusters.keySet(),
+        blackListSubClusters);
+
     if (blackListSubClusters != null) {
       // Remove from the active SubClusters from StateStore the blacklisted ones
       validSubClusters.removeAll(blackListSubClusters);
@@ -128,20 +127,21 @@
       ResourceRequest nodeRequest = null;
       ResourceRequest rackRequest = null;
       ResourceRequest anyRequest = null;
+
       for (ResourceRequest rr : rrList) {
         // Handle "node" requests
         try {
           targetId = resolver.getSubClusterForNode(rr.getResourceName());
           nodeRequest = rr;
         } catch (YarnException e) {
-          LOG.error("Cannot resolve node : {}", e.getLocalizedMessage());
+          LOG.error("Cannot resolve node : {}.", e.getMessage());
         }
         // Handle "rack" requests
         try {
           resolver.getSubClustersForRack(rr.getResourceName());
           rackRequest = rr;
         } catch (YarnException e) {
-          LOG.error("Cannot resolve rack : {}", e.getLocalizedMessage());
+          LOG.error("Cannot resolve rack : {}.", e.getMessage());
         }
         // Handle "ANY" requests
         if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
@@ -149,32 +149,33 @@
           continue;
         }
       }
+
       if (nodeRequest == null) {
-        throw new YarnException("Missing node request");
+        throw new YarnException("Missing node request.");
       }
       if (rackRequest == null) {
-        throw new YarnException("Missing rack request");
+        throw new YarnException("Missing rack request.");
       }
       if (anyRequest == null) {
-        throw new YarnException("Missing any request");
+        throw new YarnException("Missing any request.");
       }
-      LOG.info(
-          "Node request: " + nodeRequest.getResourceName() + ", Rack request: "
-              + rackRequest.getResourceName() + ", Any request: " + anyRequest
-              .getResourceName());
+
+      LOG.info("Node request: {} , Rack request: {} , Any request: {}.",
+          nodeRequest.getResourceName(), rackRequest.getResourceName(),
+          anyRequest.getResourceName());
+
       // Handle "node" requests
       if (validSubClusters.contains(targetId) && enabledSCs
           .contains(targetId)) {
-        LOG.info("Node {} is in SubCluster: {}", nodeRequest.getResourceName(),
-            targetId);
+        LOG.info("Node {} is in SubCluster: {}.", nodeRequest.getResourceName(), targetId);
         return targetId;
       } else {
         throw new YarnException("The node " + nodeRequest.getResourceName()
             + " is in a blacklist SubCluster or not active. ");
       }
     } catch (YarnException e) {
-      LOG.error("Validating resource requests failed, Falling back to "
-          + "WeightedRandomRouterPolicy placement: " + e.getMessage());
+      LOG.error("Validating resource requests failed, " +
+          "Falling back to WeightedRandomRouterPolicy placement : {}.", e.getMessage());
       // FailForward to WeightedRandomRouterPolicy
       // Overwrite request to use a default ANY
       ResourceRequest amReq = Records.newRecord(ResourceRequest.class);
@@ -183,14 +184,10 @@
       amReq.setCapability(appSubmissionContext.getResource());
       amReq.setNumContainers(1);
       amReq.setRelaxLocality(true);
-      amReq.setNodeLabelExpression(
-          appSubmissionContext.getNodeLabelExpression());
-      amReq.setExecutionTypeRequest(
-          ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
-      appSubmissionContext
-          .setAMContainerResourceRequests(Collections.singletonList(amReq));
-      return super
-          .getHomeSubcluster(appSubmissionContext, blackListSubClusters);
+      amReq.setNodeLabelExpression(appSubmissionContext.getNodeLabelExpression());
+      amReq.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
+      appSubmissionContext.setAMContainerResourceRequests(Collections.singletonList(amReq));
+      return super.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
     }
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index b81ca07..7d50d38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -17,13 +17,9 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -37,30 +33,15 @@
 public class PriorityRouterPolicy extends AbstractRouterPolicy {
 
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blacklist) throws YarnException {
-
-    // null checks and default-queue behavior
-    validate(appSubmissionContext);
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
-
-    FederationPolicyUtils.validateSubClusterAvailability(
-        new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
-
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
     // This finds the sub-cluster with the highest weight among the
     // currently active ones.
-    Map<SubClusterIdInfo, Float> weights =
-        getPolicyInfo().getRouterPolicyWeights();
+    Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights();
     SubClusterId chosen = null;
     Float currentBest = Float.MIN_VALUE;
-    for (SubClusterId id : activeSubclusters.keySet()) {
+    for (SubClusterId id : preSelectSubclusters.keySet()) {
       SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
-      if (blacklist != null && blacklist.contains(id)) {
-        continue;
-      }
       if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) {
         currentBest = weights.get(idInfo);
         chosen = id;
@@ -68,10 +49,8 @@
     }
     if (chosen == null) {
       throw new FederationPolicyException(
-          "No Active Subcluster with weight vector greater than zero");
+          "No Active Subcluster with weight vector greater than zero.");
     }
-
     return chosen;
   }
-
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
index b4c0192..32e31eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
@@ -17,15 +17,15 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
-import java.util.List;
+import java.util.Map;
 
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 
 /**
  * This {@link FederationRouterPolicy} simply rejects all incoming requests.
@@ -43,34 +43,12 @@
     setPolicyContext(federationPolicyContext);
   }
 
-  /**
-   * The policy always reject requests.
-   *
-   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
-   *          has to be routed to an appropriate subCluster for execution.
-   *
-   * @param blackListSubClusters the list of subClusters as identified by
-   *          {@link SubClusterId} to blackList from the selection of the home
-   *          subCluster.
-   *
-   * @return (never).
-   *
-   * @throws YarnException (always) to prevent applications in this queue to be
-   *           run anywhere in the federated cluster.
-   */
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blackListSubClusters) throws YarnException {
-
-    // run standard validation, as error might differ
-    validate(appSubmissionContext);
-
-    throw new FederationPolicyException("The policy configured for this queue"
-        + " (" + appSubmissionContext.getQueue() + ") reject all routing "
-        + "requests by construction. Application "
-        + appSubmissionContext.getApplicationId()
-        + " cannot be routed to any RM.");
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
+    throw new FederationPolicyException(
+        "The policy configured for this queue (" + queue + ") "
+        + "reject all routing requests by construction. Application in "
+        + queue + " cannot be routed to any RM.");
   }
-
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index 7a8be91..3533296 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -22,11 +22,10 @@
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -55,50 +54,16 @@
         this.getClass().getCanonicalName());
 
     // note: this overrides AbstractRouterPolicy and ignores the weights
-
     setPolicyContext(policyContext);
   }
 
-  /**
-   * Simply picks a random active subCluster to start the AM (this does NOT
-   * depend on the weights in the policy).
-   *
-   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
-   *          has to be routed to an appropriate subCluster for execution.
-   *
-   * @param blackListSubClusters the list of subClusters as identified by
-   *          {@link SubClusterId} to blackList from the selection of the home
-   *          subCluster.
-   *
-   * @return a randomly chosen subcluster.
-   *
-   * @throws YarnException if there are no active subclusters.
-   */
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blackListSubClusters) throws YarnException {
-
-    // null checks and default-queue behavior
-    validate(appSubmissionContext);
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
-
-    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
-
-    FederationPolicyUtils.validateSubClusterAvailability(list,
-        blackListSubClusters);
-
-    if (blackListSubClusters != null) {
-
-      // Remove from the active SubClusters from StateStore the blacklisted ones
-      for (SubClusterId scId : blackListSubClusters) {
-        list.remove(scId);
-      }
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
+    if (preSelectSubclusters == null || preSelectSubclusters.isEmpty()) {
+      throw new FederationPolicyException("No available subcluster to choose from.");
     }
-
+    List<SubClusterId> list = new ArrayList<>(preSelectSubclusters.keySet());
     return list.get(rand.nextInt(list.size()));
   }
-
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index b143410..f2acf66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -19,10 +19,8 @@
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
@@ -35,47 +33,30 @@
  * sub-clusters.
  */
 public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
-
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blacklist) throws YarnException {
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
 
-    // null checks and default-queue behavior
-    validate(appSubmissionContext);
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
-
-    FederationPolicyUtils.validateSubClusterAvailability(
-        new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
-
-    // note: we cannot pre-compute the weights, as the set of activeSubcluster
+    // note: we cannot pre-compute the weights, as the set of activeSubCluster
     // changes dynamically (and this would unfairly spread the load to
     // sub-clusters adjacent to an inactive one), hence we need to count/scan
     // the list and based on weight pick the next sub-cluster.
-    Map<SubClusterIdInfo, Float> weights =
-        getPolicyInfo().getRouterPolicyWeights();
+    Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights();
 
     ArrayList<Float> weightList = new ArrayList<>();
     ArrayList<SubClusterId> scIdList = new ArrayList<>();
     for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
-      if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
-        continue;
-      }
-      if (entry.getKey() != null
-          && activeSubclusters.containsKey(entry.getKey().toId())) {
+      SubClusterIdInfo key = entry.getKey();
+      if (key != null && preSelectSubclusters.containsKey(key.toId())) {
         weightList.add(entry.getValue());
-        scIdList.add(entry.getKey().toId());
+        scIdList.add(key.toId());
       }
     }
 
     int pickedIndex = FederationPolicyUtils.getWeightedRandom(weightList);
     if (pickedIndex == -1) {
-      throw new FederationPolicyException(
-          "No positive weight found on active subclusters");
+      throw new FederationPolicyException("No positive weight found on active subclusters");
     }
     return scIdList.get(pickedIndex);
   }
-
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java
index bcf75ab..0ffe4ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.federation.store.records;
 
 import java.util.List;
+import java.util.Collection;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -36,7 +37,7 @@
   @Public
   @Unstable
   public static GetSubClustersInfoResponse newInstance(
-      List<SubClusterInfo> subClusters) {
+      Collection<SubClusterInfo> subClusters) {
     GetSubClustersInfoResponse subClusterInfos =
         Records.newRecord(GetSubClustersInfoResponse.class);
     subClusterInfos.setSubClusters(subClusters);
@@ -61,6 +62,5 @@
    */
   @Private
   @Unstable
-  public abstract void setSubClusters(List<SubClusterInfo> subClusters);
-
+  public abstract void setSubClusters(Collection<SubClusterInfo> subClusters);
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetSubClustersInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetSubClustersInfoResponsePBImpl.java
index 5ecc3e2..2715708 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetSubClustersInfoResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetSubClustersInfoResponsePBImpl.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -93,12 +95,12 @@
   }
 
   @Override
-  public void setSubClusters(List<SubClusterInfo> subClusters) {
+  public void setSubClusters(Collection<SubClusterInfo> subClusters) {
     if (subClusters == null) {
       builder.clearSubClusterInfos();
       return;
     }
-    this.subClusterInfos = subClusters;
+    this.subClusterInfos = subClusters.stream().collect(Collectors.toList());
   }
 
   private void initSubClustersInfoList() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index 249efd3..d9ebd2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.federation.policies;
 
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -28,20 +27,26 @@
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
 import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.junit.Test;
 
 /**
@@ -58,6 +63,9 @@
   private Random rand = new Random();
   private SubClusterId homeSubCluster;
 
+  private ReservationSubmissionRequest reservationSubmissionRequest =
+      mock(ReservationSubmissionRequest.class);
+
   @Test
   public void testReinitilialize() throws YarnException {
     FederationPolicyInitializationContext fpc =
@@ -177,11 +185,60 @@
   public void setMockActiveSubclusters(int numSubclusters) {
     for (int i = 1; i <= numSubclusters; i++) {
       SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
-      SubClusterInfo sci = mock(SubClusterInfo.class);
-      when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
-      when(sci.getSubClusterId()).thenReturn(sc.toId());
+      SubClusterInfo sci = SubClusterInfo.newInstance(
+          sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83", SubClusterState.SC_RUNNING,
+          System.currentTimeMillis(), "something");
       getActiveSubclusters().put(sc.toId(), sci);
     }
   }
 
+  public String generateClusterMetricsInfo(int id) {
+    long mem = 1024 * getRand().nextInt(277 * 100 - 1);
+    // plant a best cluster
+    if (id == 5) {
+      mem = 1024 * 277 * 100;
+    }
+    String clusterMetrics =
+        "{\"clusterMetrics\":{\"appsSubmitted\":65, \"appsCompleted\":64,\"appsPending\":0,"
+        + "\"appsRunning\":0, \"appsFailed\":0, \"appsKilled\":1,\"reservedMB\":0,\"availableMB\":"
+        + mem + ", \"allocatedMB\":0,\"reservedVirtualCores\":0, \"availableVirtualCores\":2216,"
+        + "\"allocatedVirtualCores\":0, \"containersAllocated\":0,\"containersReserved\":0,"
+        + "\"containersPending\":0,\"totalMB\":28364800, \"totalVirtualCores\":2216,"
+        + "\"totalNodes\":278, \"lostNodes\":1,\"unhealthyNodes\":0,\"decommissionedNodes\":0, "
+        + "\"rebootedNodes\":0, \"activeNodes\":277}}";
+    return clusterMetrics;
+  }
+
+  public FederationStateStoreFacade getMemoryFacade() throws YarnException {
+
+    // setting up a store and its facade (with caching off)
+    FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+    FederationStateStore store = new MemoryFederationStateStore();
+    store.init(conf);
+    fedFacade.reinitialize(store, conf);
+
+    for (SubClusterInfo sinfo : getActiveSubclusters().values()) {
+      store.registerSubCluster(SubClusterRegisterRequest.newInstance(sinfo));
+    }
+
+    return fedFacade;
+  }
+
+  public ReservationSubmissionRequest getReservationSubmissionRequest() {
+    return reservationSubmissionRequest;
+  }
+
+  public void setReservationSubmissionRequest(
+      ReservationSubmissionRequest reservationSubmissionRequest) {
+    this.reservationSubmissionRequest = reservationSubmissionRequest;
+  }
+
+  public void setupContext() throws YarnException {
+    FederationPolicyInitializationContext context =
+        FederationPoliciesTestUtil.initializePolicyContext2(getPolicy(),
+        getPolicyInfo(), getActiveSubclusters(), getMemoryFacade());
+    this.setFederationPolicyContext(context);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
index d09ba75..afa46b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
@@ -115,4 +116,14 @@
       }
     }
   }
+
+  @Test
+  public void testNullReservationContext() throws Exception {
+    FederationRouterPolicy policy = ((FederationRouterPolicy) getPolicy());
+
+    LambdaTestUtils.intercept(FederationPolicyException.class,
+        "The ReservationSubmissionRequest cannot be null.",
+        () -> policy.getReservationHomeSubcluster(null));
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
index ee3e09d..57f0b59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,8 +49,7 @@
     setMockActiveSubclusters(numSubclusters);
 
     // initialize policy with context
-    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-        getPolicyInfo(), getActiveSubclusters());
+    setupContext();
   }
 
   @Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
index 58f1b99..3f6c9d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
@@ -17,12 +17,13 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
-import static org.junit.Assert.fail;
-
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -46,12 +47,14 @@
     Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
     Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
 
+    long now = Time.now();
+
     // simulate 20 active subclusters
     for (int i = 0; i < 20; i++) {
       SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i));
-      SubClusterInfo federationSubClusterInfo =
-          SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1,
-              SubClusterState.SC_RUNNING, -1, generateClusterMetricsInfo(i));
+      SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
+          sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
+          now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
       getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
       float weight = getRand().nextInt(2);
       if (i == 5) {
@@ -67,12 +70,11 @@
     getPolicyInfo().setRouterPolicyWeights(routerWeights);
     getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
 
-    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-        getPolicyInfo(), getActiveSubclusters());
-
+    // initialize policy with context
+    setupContext();
   }
 
-  private String generateClusterMetricsInfo(int id) {
+  public String generateClusterMetricsInfo(int id) {
 
     long mem = 1024 * getRand().nextInt(277 * 100 - 1);
     // plant a best cluster
@@ -106,7 +108,7 @@
   }
 
   @Test
-  public void testIfNoSubclustersWithWeightOne() {
+  public void testIfNoSubclustersWithWeightOne() throws Exception {
     setPolicy(new LoadBasedRouterPolicy());
     setPolicyInfo(new WeightedPolicyInfo());
     Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
@@ -123,15 +125,13 @@
     getPolicyInfo().setRouterPolicyWeights(routerWeights);
     getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
 
-    try {
-      FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-          getPolicyInfo(), getActiveSubclusters());
-      ((FederationRouterPolicy) getPolicy())
-          .getHomeSubcluster(getApplicationSubmissionContext(), null);
-      fail();
-    } catch (YarnException ex) {
-      Assert.assertTrue(
-          ex.getMessage().contains("Zero Active Subcluster with weight 1"));
-    }
+
+    ConfigurableFederationPolicy policy = getPolicy();
+    FederationPoliciesTestUtil.initializePolicyContext(policy,
+        getPolicyInfo(), getActiveSubclusters());
+
+    LambdaTestUtils.intercept(YarnException.class, "Zero Active Subcluster with weight 1.",
+        () ->  ((FederationRouterPolicy) policy).
+        getHomeSubcluster(getApplicationSubmissionContext(), null));
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java
index 0593932..3af0d03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java
@@ -73,6 +73,7 @@
 
     configureWeights(4);
 
+    // initialize policy with context
     initializePolicy(new YarnConfiguration());
   }
 
@@ -86,9 +87,7 @@
             .newInstance("queue1", getPolicy().getClass().getCanonicalName(),
                 buf));
     getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
-    FederationPoliciesTestUtil
-        .initializePolicyContext(getFederationPolicyContext(), getPolicy(),
-            getPolicyInfo(), getActiveSubclusters(), conf);
+    setupContext();
   }
 
   /**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
index e1799d3..ea03905 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
@@ -54,10 +55,11 @@
 
       // with 5% omit a subcluster
       if (getRand().nextFloat() < 0.95f || i == 5) {
-        SubClusterInfo sci = mock(SubClusterInfo.class);
-        when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
-        when(sci.getSubClusterId()).thenReturn(sc.toId());
-        getActiveSubclusters().put(sc.toId(), sci);
+        long now = Time.now();
+        SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
+            sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
+            now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
+        getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
       }
       float weight = getRand().nextFloat();
       if (i == 5) {
@@ -105,7 +107,7 @@
         getPolicyInfo(), getActiveSubclusters());
 
     intercept(FederationPolicyException.class,
-        "No Active Subcluster with weight vector greater than zero",
+        "No Active Subcluster with weight vector greater than zero.",
         () -> ((FederationRouterPolicy) getPolicy())
             .getHomeSubcluster(getApplicationSubmissionContext(), null));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
index 1747f73..a3816b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
@@ -20,7 +20,6 @@
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
-import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,8 +38,7 @@
     setMockActiveSubclusters(2);
 
     // initialize policy with context
-    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-        getPolicyInfo(), getActiveSubclusters());
+    setupContext();
 
   }
 
@@ -59,5 +57,4 @@
             false, false, 0, Resources.none(), null, false, null, null);
     localPolicy.getHomeSubcluster(applicationSubmissionContext, null);
   }
-
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
index 05490ab..8346277 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
@@ -18,15 +18,14 @@
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
-import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,14 +43,14 @@
     setPolicyInfo(mock(WeightedPolicyInfo.class));
     for (int i = 1; i <= 2; i++) {
       SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
-      SubClusterInfo sci = mock(SubClusterInfo.class);
-      when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
-      when(sci.getSubClusterId()).thenReturn(sc.toId());
-      getActiveSubclusters().put(sc.toId(), sci);
+      long now = Time.now();
+      SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
+          sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
+          now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
+      getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
     }
 
-    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-        mock(WeightedPolicyInfo.class), getActiveSubclusters());
+    setupContext();
   }
 
   @Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
index d549250..8121ce2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
@@ -32,7 +33,6 @@
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
-import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,8 +51,7 @@
 
     configureWeights(20);
 
-    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-        getPolicyInfo(), getActiveSubclusters());
+    setupContext();
   }
 
   public void configureWeights(float numSubClusters) {
@@ -68,10 +67,11 @@
       SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
       // with 5% omit a subcluster
       if (getRand().nextFloat() < 0.95f) {
-        SubClusterInfo sci = mock(SubClusterInfo.class);
-        when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
-        when(sci.getSubClusterId()).thenReturn(sc.toId());
-        getActiveSubclusters().put(sc.toId(), sci);
+        long now = Time.now();
+        SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
+            sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
+            now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
+        getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
       }
 
       // 80% of the weight is evenly spread, 20% is randomly generated
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
index a9b9029..6ae64d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
@@ -159,6 +159,51 @@
         new Configuration());
   }
 
+  public static FederationPolicyInitializationContext initializePolicyContext2(
+      ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
+      Map<SubClusterId, SubClusterInfo> activeSubClusters,
+      FederationStateStoreFacade facade) throws YarnException {
+    FederationPolicyInitializationContext context =
+        new FederationPolicyInitializationContext(null, initResolver(), facade,
+        SubClusterId.newInstance("homesubcluster"));
+    return initializePolicyContext2(context, policy, policyInfo, activeSubClusters);
+  }
+
+  public static FederationPolicyInitializationContext initializePolicyContext2(
+      ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
+      Map<SubClusterId, SubClusterInfo> activeSubClusters)
+      throws YarnException {
+    return initializePolicyContext2(policy, policyInfo, activeSubClusters, initFacade());
+  }
+
+  public static FederationPolicyInitializationContext initializePolicyContext2(
+      FederationPolicyInitializationContext fpc,
+      ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
+      Map<SubClusterId, SubClusterInfo> activeSubClusters)
+      throws YarnException {
+    ByteBuffer buf = policyInfo.toByteBuffer();
+    fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
+        .newInstance("queue1", policy.getClass().getCanonicalName(), buf));
+
+    if (fpc.getFederationStateStoreFacade() == null) {
+      FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance();
+      FederationStateStore fss = mock(FederationStateStore.class);
+
+      if (activeSubClusters == null) {
+        activeSubClusters = new HashMap<>();
+      }
+
+      GetSubClustersInfoResponse response = GetSubClustersInfoResponse.newInstance(
+          activeSubClusters.values());
+
+      when(fss.getSubClusters(any())).thenReturn(response);
+      facade.reinitialize(fss, new Configuration());
+      fpc.setFederationStateStoreFacade(facade);
+    }
+    policy.reinitialize(fpc);
+    return fpc;
+  }
+
   /**
    * Initialize a {@link SubClusterResolver}.
    *
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 9415f3d..474d5fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletRequestWrapper;
@@ -179,19 +180,13 @@
       RouterServerUtil.logAndThrowException(
           FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
     }
-    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
-
-    FederationPolicyUtils.validateSubClusterAvailability(
-        list, blackListSubClusters);
-
+    Collection<SubClusterId> keySet = activeSubclusters.keySet();
+    FederationPolicyUtils.validateSubClusterAvailability(keySet, blackListSubClusters);
     if (blackListSubClusters != null) {
-
-      // Remove from the active SubClusters from StateStore the blacklisted ones
-      for (SubClusterId scId : blackListSubClusters) {
-        list.remove(scId);
-      }
+      keySet.removeAll(blackListSubClusters);
     }
 
+    List<SubClusterId> list = keySet.stream().collect(Collectors.toList());
     return list.get(rand.nextInt(list.size()));
   }