[STORM-3739] Scheduling should sort numa zones by host groups (#3379)

* [STORM-3739] Scheduling should sort numa zones by host groups

* [YSTORM-3739] Ignore blacklisted hosts and corresponding nodes/supervisors.

* [STORM-3739] Javadoc fix.

* [STORM-3739] Add check for null hostname, remove unused code, set totalResources.

* [STORM-3739] Remove unused methods.

* [STORM-3739] Sort hosts comparing average resources before comparing min resources.

* [STORM-3739] Add dead node check.

* [YSTORM-3739] NodeSortType changes and revert.

* [STORM-3739] Removed unused/commented code.

Co-authored-by: Bipin Prasad <bprasad@verizonmedia.com>
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index c584351..cc3561e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -19,12 +19,15 @@
 package org.apache.storm.scheduler;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 
@@ -283,6 +286,18 @@
     Map<String, List<String>> getNetworkTopography();
 
     /**
+     * Get host -> rack map - the inverse of networkTopography.
+     */
+    default Map<String, String> getHostToRack() {
+        Map<String, String> ret = new HashMap<>();
+        Map<String, List<String>> networkTopography = getNetworkTopography();
+        if (networkTopography != null) {
+            networkTopography.forEach((rack, hosts) -> hosts.forEach(host -> ret.put(host, rack)));
+        }
+        return ret;
+    }
+
+    /**
      * Get all topology scheduler statuses.
      */
     Map<String, String> getStatusMap();
@@ -339,4 +354,31 @@
      * Get the nimbus configuration.
      */
     Map<String, Object> getConf();
+
+    /**
+     * Determine the list of racks on which topologyIds have been assigned. Note that the returned set
+     * may contain {@link DNSToSwitchMapping#DEFAULT_RACK} if {@link #getHostToRack()} is null or
+     * does not contain the assigned host.
+     *
+     * @param topologyIds for which assignments are examined.
+     * @return set of racks on which assignments have been made.
+     */
+    default Set<String> getAssignedRacks(String... topologyIds) {
+        Set<String> ret = new HashSet<>();
+        Map<String, String> networkTopographyInverted = getHostToRack();
+        for (String topologyId: topologyIds) {
+            SchedulerAssignment assignment = getAssignmentById(topologyId);
+            if (assignment == null) {
+                continue;
+            }
+            for (WorkerSlot slot : assignment.getSlots()) {
+                String nodeId = slot.getNodeId();
+                SupervisorDetails supervisorDetails = getSupervisorById(nodeId);
+                String hostId = supervisorDetails.getHost();
+                String rackId = networkTopographyInverted.getOrDefault(hostId, DNSToSwitchMapping.DEFAULT_RACK);
+                ret.add(rackId);
+            }
+        }
+        return ret;
+    }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
index 427fb56..645c4e0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
@@ -156,6 +156,18 @@
         return hostnameToNodes;
     }
 
+    /**
+     * Get a map from RasNodeId to HostName.
+     *
+     * @return map of nodeId to hostname
+     */
+    public Map<String, String> getNodeIdToHostname() {
+        Map<String, String> nodeIdToHostname = new HashMap<>();
+        nodeMap.values()
+                .forEach(node -> nodeIdToHostname.put(node.getId(), node.getHostname()));
+        return nodeIdToHostname;
+    }
+
     @Override
     public String toString() {
         StringBuilder ret = new StringBuilder();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index ea77b1a..de097a6 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -247,10 +247,16 @@
         return null;
     }
 
+    private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb, String info) {
+        throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+                        + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f', additionalInfo: '%s'",
+                used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb, info));
+    }
+
     private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
         throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
-                                                         + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
-                                                         used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
+                        + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+                used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
     }
 
     /**
@@ -375,7 +381,8 @@
                 return 0;
             }
             if (used.otherResources[i] > otherResources[i]) {
-                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+                String info = String.format("%s, %f > %f", getResourceNameForResourceIndex(i), used.otherResources[i], otherResources[i]);
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb, info);
             }
             min = Math.min(min, used.otherResources[i] / otherResources[i]);
         }
@@ -384,14 +391,16 @@
 
     /**
      * If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
-     * be less likely to be selected.
+     * be less likely to be selected. If the resource is in the request, make that resource positive.
      * @param request the requested resources.
      */
     public void updateForRareResourceAffinity(NormalizedResources request) {
         int length = Math.min(this.otherResources.length, request.otherResources.length);
         for (int i = 0; i < length; i++) {
-            if (request.getResourceAt(i) == 0.0) {
-                this.otherResources[i] = -1 * this.otherResources[i];
+            if (request.getResourceAt(i) == 0.0 && this.otherResources[i] > 0.0) {
+                this.otherResources[i] = -this.otherResources[i]; // make negative
+            } else if (request.getResourceAt(i) > 0.0 && this.otherResources[i] < 0.0) {
+                this.otherResources[i] = -this.otherResources[i]; // make positive
             }
         }
     }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 6f49083..4bc4d0b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -44,6 +44,7 @@
 import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
 import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
 import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
@@ -58,9 +59,25 @@
      * Refer to {@link NodeSorter#NodeSorter(Cluster, TopologyDetails, NodeSortType)} for more details.
      */
     public enum NodeSortType {
-        GENERIC_RAS, // for deprecation, Used by GenericResourceAwareStrategyOld
-        DEFAULT_RAS, // for deprecation, Used by DefaultResourceAwareStrategyOld
-        COMMON       // new and only node sorting type going forward
+        /**
+         * Generic Resource Aware Strategy sorting type.
+         * @deprecated used by GenericResourceAwareStrategyOld only. Use {link #COMMON} instead.
+         */
+        @Deprecated
+        GENERIC_RAS,
+
+        /**
+         * Default Resource Aware Strategy sorting type.
+         * @deprecated used by DefaultResourceAwareStrategyOld only. Use {link #COMMON} instead.
+         */
+        @Deprecated
+        DEFAULT_RAS,
+
+        /**
+         * New and only node sorting type going forward.
+         * {@link NodeSorterHostProximity#NodeSorterHostProximity(Cluster, TopologyDetails)} for more details
+         */
+        COMMON,
     }
 
     // instance variables from class instantiation
@@ -149,7 +166,8 @@
         List<ExecutorDetails> orderedExecutors = execSorter.sortExecutors(unassignedExecutors);
         Iterable<String> sortedNodes = null;
         if (!this.sortNodesForEachExecutor) {
-            sortedNodes = nodeSorter.sortAllNodes(null);
+            nodeSorter.prepare(null);
+            sortedNodes = nodeSorter.sortAllNodes();
         }
         return scheduleExecutorsOnNodes(orderedExecutors, sortedNodes);
     }
@@ -189,7 +207,7 @@
         LOG.debug("The max state search that will be used by topology {} is {}", topologyDetails.getId(), maxStateSearch);
 
         searcherState = createSearcherState();
-        setNodeSorter(new NodeSorter(cluster, topologyDetails, nodeSortType));
+        setNodeSorter(new NodeSorterHostProximity(cluster, topologyDetails, nodeSortType));
         setExecSorter(orderExecutorsByProximity
                 ? new ExecSorterByProximity(topologyDetails)
                 : new ExecSorterByConnectionCount(topologyDetails));
@@ -413,7 +431,8 @@
         for (int i = 0; i < maxExecCnt ; i++) {
             progressIdxForExec[i] = -1;
         }
-        LOG.info("scheduleExecutorsOnNodes: will assign {} executors for topo {}", maxExecCnt, topoName);
+        LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo {}, sortNodesForEachExecutor={}",
+                maxExecCnt, topoName, sortNodesForEachExecutor);
 
         OUTERMOST_LOOP:
         for (int loopCnt = 0 ; true ; loopCnt++) {
@@ -450,7 +469,8 @@
             String comp = execToComp.get(exec);
             if (sortedNodesIter == null || (this.sortNodesForEachExecutor && searcherState.isExecCompDifferentFromPrior())) {
                 progressIdx = -1;
-                sortedNodesIter = nodeSorter.sortAllNodes(exec);
+                nodeSorter.prepare(exec);
+                sortedNodesIter = nodeSorter.sortAllNodes();
             }
 
             for (String nodeId : sortedNodesIter) {
@@ -470,8 +490,8 @@
                     if (numBoundAckerAssigned == -1) {
                         // This only happens when trying to assign bound ackers to the worker slot and failed.
                         // Free the entire worker slot and put those bound ackers back to unassigned list
-                        LOG.debug("Failed to assign bound acker for exec: {} of topo: {} to worker: {}.  Backtracking.",
-                            exec, topoName, workerSlot);
+                        LOG.debug("Failed to assign bound acker for exec={}, comp={}, topo: {} to worker: {}.  Backtracking.",
+                            exec, comp, topoName, workerSlot);
                         searcherState.freeWorkerSlotWithBoundAckers(node, workerSlot);
                         continue;
                     }
@@ -481,9 +501,13 @@
                         // and this is not the first exec to this workerSlot.
                         // So just go to next workerSlot and don't free the worker.
                         if (numBoundAckerAssigned > 0) {
-                            LOG.debug("Failed to assign exec: {} of topo: {} with bound ackers to worker: {}.  Backtracking.",
-                                exec, topoName, workerSlot);
+                            LOG.debug("Failed to assign exec={}, comp={}, topo={} with bound ackers to worker: {}.  Backtracking.",
+                                exec, comp, topoName, workerSlot);
                             searcherState.freeWorkerSlotWithBoundAckers(node, workerSlot);
+                        } else {
+                            LOG.debug("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
+                                exec, comp, topoName, workerSlot,
+                                node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
                         }
                         continue;
                     }
@@ -506,8 +530,10 @@
                     searcherState = searcherState.nextExecutor();
                     nodeForExec[execIndex] = node;
                     workerSlotForExec[execIndex] = workerSlot;
-                    LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}, slot-ordinal={} at loopCnt={}, topo={}",
-                            execIndex, comp, nodeId, progressIdx, loopCnt, topoName);
+                    LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}/cpu={}/mem={}, "
+                            + "slot-ordinal={} at loopCnt={}, topo={}",
+                        execIndex, comp, nodeId, node.getAvailableCpuResources(), node.getAvailableMemoryResources(),
+                        progressIdx, loopCnt, topoName);
                     continue OUTERMOST_LOOP;
                 }
             }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
index 5532d78..7b4f7d2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
@@ -30,7 +30,7 @@
 
     /**
      * Amongst all {@link #availableResources}, this is the minimum ratio of resource to the total available in group.
-     * Note that nodes are grouped into racks. And racks are grouped under the cluster.
+     * Note that nodes are grouped into hosts. Hosts into racks. And racks are grouped under the cluster.
      *
      * <p>
      * An example of this calculation is in
@@ -43,7 +43,7 @@
 
     /**
      * Amongst all {@link #availableResources}, this is the average ratio of resource to the total available in group.
-     * Note that nodes are grouped into racks. And racks are grouped under the cluster.
+     * Note that nodes are grouped into hosts, hosts into racks, and racks are grouped under the cluster.
      *
      * <p>
      * An example of this calculation is in
@@ -73,6 +73,11 @@
         this.avgResourcePercent = avgResourcePercent;
     }
 
+    public void add(ObjectResourcesItem other) {
+        this.availableResources.add(other.availableResources);
+        this.totalResources.add(other.totalResources);
+    }
+
     @Override
     public String toString() {
         return this.id;
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
index ceda82e..f2a4e8d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
@@ -18,14 +18,20 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
 
-import java.util.TreeSet;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
 
 
 public interface INodeSorter {
 
-    TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec);
+    /**
+     * Prepare for node sorting. This method must be called before {@link #getSortedRacks()} and {@link #sortAllNodes()}.
+     *
+     * @param exec optional, may be null.
+     */
+    void prepare(ExecutorDetails exec);
 
-    Iterable<String> sortAllNodes(ExecutorDetails exec);
+    Iterable<ObjectResourcesItem> getSortedRacks();
+
+    Iterable<String> sortAllNodes();
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
index 85a9015..7ff4b05 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -28,7 +29,6 @@
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -70,6 +70,9 @@
     protected List<String> favoredNodeIds;
     protected List<String> unFavoredNodeIds;
 
+    // Updated in prepare method
+    ExecutorDetails exec;
+
     /**
      * Initialize for the default implementation node sorting.
      *
@@ -93,21 +96,18 @@
 
         // from Cluster
         networkTopography = cluster.getNetworkTopography();
-        Map<String, String> hostToRack = new HashMap<>();
-        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
-            String rackId = entry.getKey();
-            for (String hostName: entry.getValue()) {
-                hostToRack.put(hostName, rackId);
-            }
-        }
+        Map<String, String> hostToRack = cluster.getHostToRack();
         RasNodes nodes = new RasNodes(cluster);
         for (RasNode node: nodes.getNodes()) {
             String superId = node.getId();
             String hostName = node.getHostname();
+            if (!node.isAlive() || hostName == null) {
+                continue;
+            }
             String rackId = hostToRack.getOrDefault(hostName, DNSToSwitchMapping.DEFAULT_RACK);
             superIdToRack.put(superId, rackId);
             hostnameToNodes.computeIfAbsent(hostName, (hn) -> new ArrayList<>()).add(node);
-            rackIdToNodes.computeIfAbsent(rackId, (hn) -> new ArrayList<>()).add(node);
+            rackIdToNodes.computeIfAbsent(rackId, (rid) -> new ArrayList<>()).add(node);
         }
         this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
 
@@ -122,16 +122,21 @@
         unFavoredNodeIds.removeAll(favoredNodeIds);
     }
 
+    @Override
+    public void prepare(ExecutorDetails exec) {
+        this.exec = exec;
+    }
+
     /**
-     * Scheduling uses {@link #sortAllNodes(ExecutorDetails)} which eventually
-     * calls this method whose behavior can altered by setting {@link #nodeSortType}.
+     * Scheduling uses {@link #sortAllNodes()} which eventually
+     * calls this method whose behavior can be altered by setting {@link #nodeSortType}.
      *
      * @param resourcesSummary     contains all individual {@link ObjectResourcesItem} as well as cumulative stats
      * @param exec                 executor for which the sorting is done
      * @param existingScheduleFunc a function to get existing executors already scheduled on this object
      * @return a sorted list of {@link ObjectResourcesItem}
      */
-    protected TreeSet<ObjectResourcesItem> sortObjectResources(
+    protected List<ObjectResourcesItem> sortObjectResources(
             ObjectResourcesSummary resourcesSummary, ExecutorDetails exec, ExistingScheduleFunc existingScheduleFunc) {
         switch (nodeSortType) {
             case DEFAULT_RAS:
@@ -180,7 +185,7 @@
      * @param existingScheduleFunc a function to get existing executors already scheduled on this object
      * @return a sorted list of ObjectResources
      */
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesCommon(
+    private List<ObjectResourcesItem> sortObjectResourcesCommon(
             final ObjectResourcesSummary allResources, final ExecutorDetails exec,
             final ExistingScheduleFunc existingScheduleFunc) {
         // Copy and modify allResources
@@ -203,34 +208,31 @@
         );
 
         // Use the following comparator to return a sorted set
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        double o1Avg = o1.avgResourcePercent;
-                        double o2Avg = o2.avgResourcePercent;
-
-                        if (o1Avg > o2Avg) {
-                            return -1;
-                        } else if (o1Avg < o2Avg) {
-                            return 1;
-                        } else {
-                            if (o1.minResourcePercent > o2.minResourcePercent) {
-                                return -1;
-                            } else if (o1.minResourcePercent < o2.minResourcePercent) {
-                                return 1;
-                            } else {
-                                return o1.id.compareTo(o2.id);
-                            }
-                        }
-                    }
-                });
+        List<ObjectResourcesItem> sortedObjectResources = new ArrayList();
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = o1.avgResourcePercent;
+            double o2Avg = o2.avgResourcePercent;
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
         sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        sortedObjectResources.sort(comparator);
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
     }
@@ -258,38 +260,35 @@
      * @return a sorted list of ObjectResources
      */
     @Deprecated
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesGeneric(
+    private List<ObjectResourcesItem> sortObjectResourcesGeneric(
             final ObjectResourcesSummary allResources, ExecutorDetails exec,
             final ExistingScheduleFunc existingScheduleFunc) {
         ObjectResourcesSummary affinityBasedAllResources = new ObjectResourcesSummary(allResources);
         NormalizedResourceRequest requestedResources = topologyDetails.getTotalResources(exec);
-        for (ObjectResourcesItem objectResources : affinityBasedAllResources.getObjectResources()) {
-            objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
-        }
+        affinityBasedAllResources.getObjectResources()
+                .forEach(x -> x.availableResources.updateForRareResourceAffinity(requestedResources));
         final NormalizedResourceOffer availableResourcesOverall = allResources.getAvailableResourcesOverall();
 
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        double o1Avg = availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
-                        double o2Avg = availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
-
-                        if (o1Avg > o2Avg) {
-                            return -1;
-                        } else if (o1Avg < o2Avg) {
-                            return 1;
-                        } else {
-                            return o1.id.compareTo(o2.id);
-                        }
-                    }
-                });
+        List<ObjectResourcesItem> sortedObjectResources = new ArrayList<>();
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+            double o2Avg = availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
         sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        sortedObjectResources.sort(comparator);
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
     }
@@ -313,7 +312,7 @@
      * @return a sorted list of ObjectResources
      */
     @Deprecated
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesDefault(
+    private List<ObjectResourcesItem> sortObjectResourcesDefault(
             final ObjectResourcesSummary allResources,
             final ExistingScheduleFunc existingScheduleFunc) {
 
@@ -328,32 +327,30 @@
                     existingScheduleFunc.getNumExistingSchedule(objectResources.id));
         }
 
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        if (o1.minResourcePercent > o2.minResourcePercent) {
-                            return -1;
-                        } else if (o1.minResourcePercent < o2.minResourcePercent) {
-                            return 1;
-                        } else {
-                            double diff = o1.avgResourcePercent - o2.avgResourcePercent;
-                            if (diff > 0.0) {
-                                return -1;
-                            } else if (diff < 0.0) {
-                                return 1;
-                            } else {
-                                return o1.id.compareTo(o2.id);
-                            }
-                        }
-                    }
-                });
+        List<ObjectResourcesItem> sortedObjectResources = new ArrayList<>();
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+            if (diff > 0.0) {
+                return -1;
+            } else if (diff < 0.0) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
         sortedObjectResources.addAll(allResources.getObjectResources());
+        sortedObjectResources.sort(comparator);
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
     }
@@ -375,7 +372,7 @@
      * @param rackId     the rack id availNodes are a part of
      * @return a sorted list of nodes.
      */
-    private TreeSet<ObjectResourcesItem> sortNodes(
+    private List<ObjectResourcesItem> sortNodes(
             List<RasNode> availRasNodes, ExecutorDetails exec, String rackId,
             Map<String, AtomicInteger> scheduledCount) {
         ObjectResourcesSummary rackResourcesSummary = new ObjectResourcesSummary("RACK");
@@ -428,7 +425,7 @@
         private final Iterator<String> post;
         private final Set<String> skip;
 
-        LazyNodeSortingIterator(LazyNodeSorting parent, TreeSet<ObjectResourcesItem> sortedRacks) {
+        LazyNodeSortingIterator(LazyNodeSorting parent, List<ObjectResourcesItem> sortedRacks) {
             this.parent = parent;
             rackIterator = sortedRacks.iterator();
             pre = favoredNodeIds.iterator();
@@ -495,8 +492,8 @@
 
     private class LazyNodeSorting implements Iterable<String> {
         private final Map<String, AtomicInteger> perNodeScheduledCount = new HashMap<>();
-        private final TreeSet<ObjectResourcesItem> sortedRacks;
-        private final Map<String, TreeSet<ObjectResourcesItem>> cachedNodes = new HashMap<>();
+        private final List<ObjectResourcesItem> sortedRacks;
+        private final Map<String, List<ObjectResourcesItem>> cachedNodes = new HashMap<>();
         private final ExecutorDetails exec;
         private final Set<String> skippedNodeIds = new HashSet<>();
 
@@ -516,10 +513,10 @@
                         .getAndAdd(entry.getValue().size());
                 }
             }
-            sortedRacks = sortRacks(exec);
+            sortedRacks = getSortedRacks();
         }
 
-        private TreeSet<ObjectResourcesItem> getSortedNodesFor(String rackId) {
+        private List<ObjectResourcesItem> getSortedNodesFor(String rackId) {
             return cachedNodes.computeIfAbsent(rackId,
                 (rid) -> sortNodes(rackIdToNodes.getOrDefault(rid, Collections.emptyList()), exec, rid, perNodeScheduledCount));
         }
@@ -531,7 +528,7 @@
     }
 
     @Override
-    public Iterable<String> sortAllNodes(ExecutorDetails exec) {
+    public Iterable<String> sortAllNodes() {
         return new LazyNodeSorting(exec);
     }
 
@@ -590,8 +587,7 @@
      *
      * @return a sorted list of racks
      */
-    @Override
-    public TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec) {
+    public List<ObjectResourcesItem> getSortedRacks() {
 
         final ObjectResourcesSummary clusterResourcesSummary = createClusterSummarizedResources();
         final Map<String, AtomicInteger> scheduledCount = getScheduledExecCntByRackId();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
new file mode 100644
index 0000000..64f9f08
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
+    private static final Logger LOG = LoggerFactory.getLogger(NodeSorterHostProximity.class);
+
+    // instance variables from class instantiation
+    protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
+
+    protected Cluster cluster;
+    protected TopologyDetails topologyDetails;
+
+    // Instance variables derived from Cluster.
+    private final Map<String, String> superIdToRack = new HashMap<>();
+    private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
+    private final Map<String, String> nodeIdToHostname = new HashMap<>();
+    private final Map<String, Set<String>> rackIdToHosts = new HashMap<>();
+    protected List<String> greyListedSupervisorIds;
+
+    // Instance variables from Cluster and TopologyDetails.
+    protected List<String> favoredNodeIds;
+    protected List<String> unFavoredNodeIds;
+
+    // Updated in prepare method
+    ExecutorDetails exec;
+
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails topologyDetails) {
+        this(cluster, topologyDetails, BaseResourceAwareStrategy.NodeSortType.COMMON);
+    }
+
+    /**
+     * Initialize for the default implementation node sorting.
+     *
+     * <p>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting implemented in
+     *  {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting implemented in
+     *  {@link #sortObjectResourcesDefault(ObjectResourcesSummary, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     *  <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting implemented in
+     *  {@link #sortObjectResourcesCommon(ObjectResourcesSummary, ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+     * </p>
+     *
+     * @param cluster for which nodes will be sorted.
+     * @param topologyDetails the topology to sort for.
+     * @param nodeSortType type of sorting to be applied to object resource collection {@link BaseResourceAwareStrategy.NodeSortType}.
+     */
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+        this.cluster = cluster;
+        this.topologyDetails = topologyDetails;
+        this.nodeSortType = nodeSortType;
+
+        // from Cluster
+        greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+        Map<String, String> hostToRack = cluster.getHostToRack();
+        RasNodes nodes = new RasNodes(cluster);
+        for (RasNode node: nodes.getNodes()) {
+            String superId = node.getId();
+            String hostName = node.getHostname();
+            if (!node.isAlive() || hostName == null) {
+                continue;
+            }
+            String rackId = hostToRack.getOrDefault(hostName, DNSToSwitchMapping.DEFAULT_RACK);
+            superIdToRack.put(superId, rackId);
+            hostnameToNodes.computeIfAbsent(hostName, (hn) -> new ArrayList<>()).add(node);
+            nodeIdToHostname.put(superId, hostName);
+            rackIdToHosts.computeIfAbsent(rackId, r -> new HashSet<>()).add(hostName);
+        }
+
+        // from TopologyDetails
+        Map<String, Object> topoConf = topologyDetails.getConf();
+
+        // From Cluster and TopologyDetails - and cleaned-up
+        favoredNodeIds = makeHostToNodeIds((List<String>) topoConf.get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+        unFavoredNodeIds = makeHostToNodeIds((List<String>) topoConf.get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+        favoredNodeIds.removeAll(greyListedSupervisorIds);
+        unFavoredNodeIds.removeAll(greyListedSupervisorIds);
+        unFavoredNodeIds.removeAll(favoredNodeIds);
+    }
+
+    @VisibleForTesting
+    public Map<String, Set<String>> getRackIdToHosts() {
+        return rackIdToHosts;
+    }
+
+    @Override
+    public void prepare(ExecutorDetails exec) {
+        this.exec = exec;
+    }
+
+    /**
+     * Scheduling uses {@link #sortAllNodes()} which eventually
+     * calls this method whose behavior can be altered by setting {@link #nodeSortType}.
+     *
+     * @param resourcesSummary     contains all individual {@link ObjectResourcesItem} as well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    protected Iterable<ObjectResourcesItem> sortObjectResources(
+            ObjectResourcesSummary resourcesSummary, ExecutorDetails exec, ExistingScheduleFunc existingScheduleFunc) {
+        switch (nodeSortType) {
+            case DEFAULT_RAS:
+                return sortObjectResourcesDefault(resourcesSummary, existingScheduleFunc);
+            case GENERIC_RAS:
+                return sortObjectResourcesGeneric(resourcesSummary, exec, existingScheduleFunc);
+            case COMMON:
+                return sortObjectResourcesCommon(resourcesSummary, exec, existingScheduleFunc);
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Sort objects by the following three criteria.
+     *
+     * <li>
+     *     The number executors of the topology that needs to be scheduled is already on the object (node or rack)
+     *     in descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on
+     *     the same object (node or rack) as the existing executors of the topology.
+     * </li>
+     *
+     * <li>
+     *     The subordinate/subservient resource availability percentage of a rack in descending order We calculate the
+     *     resource availability percentage by dividing the resource availability of the object (node or rack) by the
+     *     resource availability of the entire rack or cluster depending on if object references a node or a rack.
+     *     How this differs from the DefaultResourceAwareStrategy is that the percentage boosts the node or rack if it is
+     *     requested by the executor that the sorting is being done for and pulls it down if it is not.
+     *     By doing this calculation, objects (node or rack) that have exhausted or little of one of the resources mentioned
+     *     above will be ranked after racks that have more balanced resource availability and nodes or racks that have
+     *     resources that are not requested will be ranked below . So we will be less likely to pick a rack that
+     *     have a lot of one resource but a low amount of another and have a lot of resources that are not requested by the executor.
+     *     This is similar to logic used {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, ExistingScheduleFunc)}.
+     * </li>
+     *
+     * <li>
+     *     The tie between two nodes with same resource availability is broken by using the node with lower minimum
+     *     percentage used. This comparison was used in {@link #sortObjectResourcesDefault(ObjectResourcesSummary, ExistingScheduleFunc)}
+     *     but here it is made subservient to modified resource availbility used in
+     *     {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, ExistingScheduleFunc)}.
+     *
+     * </li>
+     *
+     * @param allResources         contains all individual ObjectResources as well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    private Iterable<ObjectResourcesItem> sortObjectResourcesCommon(
+            final ObjectResourcesSummary allResources, final ExecutorDetails exec,
+            final ExistingScheduleFunc existingScheduleFunc) {
+        // Copy and modify allResources
+        ObjectResourcesSummary affinityBasedAllResources = new ObjectResourcesSummary(allResources);
+        final NormalizedResourceOffer availableResourcesOverall = allResources.getAvailableResourcesOverall();
+        final NormalizedResourceRequest requestedResources = (exec != null) ? topologyDetails.getTotalResources(exec) : null;
+        affinityBasedAllResources.getObjectResources().forEach(
+            x -> {
+                if (requestedResources != null) {
+                    // negate unrequested resources
+                    x.availableResources.updateForRareResourceAffinity(requestedResources);
+                }
+                x.minResourcePercent = availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+                x.avgResourcePercent = availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
+
+                LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={}, numExistingSchedule={}",
+                        x.id, x.minResourcePercent, x.avgResourcePercent,
+                        existingScheduleFunc.getNumExistingSchedule(x.id));
+            }
+        );
+
+        // Use the following comparator to sort
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = o1.avgResourcePercent;
+            double o2Avg = o2.avgResourcePercent;
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new TreeSet(comparator);
+        sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Sort objects by the following two criteria.
+     *
+     * <li>the number executors of the topology that needs to be scheduled is already on the
+     * object (node or rack) in descending order. The reasoning to sort based on criterion 1 is so we schedule the rest
+     * of a topology on the same object (node or rack) as the existing executors of the topology.</li>
+     *
+     * <li>the subordinate/subservient resource availability percentage of a rack in descending order We calculate the
+     * resource availability percentage by dividing the resource availability of the object (node or rack) by the
+     * resource availability of the entire rack or cluster depending on if object references a node or a rack.
+     * How this differs from the DefaultResourceAwareStrategy is that the percentage boosts the node or rack if it is
+     * requested by the executor that the sorting is being done for and pulls it down if it is not.
+     * By doing this calculation, objects (node or rack) that have exhausted or little of one of the resources mentioned
+     * above will be ranked after racks that have more balanced resource availability and nodes or racks that have
+     * resources that are not requested will be ranked below . So we will be less likely to pick a rack that
+     * have a lot of one resource but a low amount of another and have a lot of resources that are not requested by the executor.</li>
+     *
+     * @param allResources         contains all individual ObjectResources as well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    @Deprecated
+    private Iterable<ObjectResourcesItem> sortObjectResourcesGeneric(
+            final ObjectResourcesSummary allResources, ExecutorDetails exec,
+            final ExistingScheduleFunc existingScheduleFunc) {
+        ObjectResourcesSummary affinityBasedAllResources = new ObjectResourcesSummary(allResources);
+        final NormalizedResourceOffer availableResourcesOverall = allResources.getAvailableResourcesOverall();
+        final NormalizedResourceRequest requestedResources = (exec != null) ? topologyDetails.getTotalResources(exec) : null;
+        affinityBasedAllResources.getObjectResources().forEach(
+            x -> {
+                if (requestedResources != null) {
+                    // negate unrequested resources
+                    x.availableResources.updateForRareResourceAffinity(requestedResources);
+                }
+                x.minResourcePercent = availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+                x.avgResourcePercent = availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
+
+                LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={}, numExistingSchedule={}",
+                    x.id, x.minResourcePercent, x.avgResourcePercent,
+                    existingScheduleFunc.getNumExistingSchedule(x.id));
+            }
+        );
+
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = o1.avgResourcePercent;
+            double o2Avg = o2.avgResourcePercent;
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new TreeSet<>(comparator);
+        sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Sort objects by the following two criteria.
+     *
+     * <li>the number executors of the topology that needs to be scheduled is already on the
+     * object (node or rack) in descending order. The reasoning to sort based on criterion 1 is so we schedule the rest
+     * of a topology on the same object (node or rack) as the existing executors of the topology.</li>
+     *
+     * <li>the subordinate/subservient resource availability percentage of a rack in descending order We calculate the
+     * resource availability percentage by dividing the resource availability of the object (node or rack) by the
+     * resource availability of the entire rack or cluster depending on if object references a node or a rack.
+     * By doing this calculation, objects (node or rack) that have exhausted or little of one of the resources mentioned
+     * above will be ranked after racks that have more balanced resource availability. So we will be less likely to pick
+     * a rack that have a lot of one resource but a low amount of another.</li>
+     *
+     * @param allResources         contains all individual ObjectResources as well as cumulative stats
+     * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+     */
+    @Deprecated
+    private Iterable<ObjectResourcesItem> sortObjectResourcesDefault(
+            final ObjectResourcesSummary allResources,
+            final ExistingScheduleFunc existingScheduleFunc) {
+
+        final NormalizedResourceOffer availableResourcesOverall = allResources.getAvailableResourcesOverall();
+        for (ObjectResourcesItem objectResources : allResources.getObjectResources()) {
+            objectResources.minResourcePercent =
+                    availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
+            objectResources.avgResourcePercent =
+                    availableResourcesOverall.calculateAveragePercentageUsedBy(objectResources.availableResources);
+            LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={}, numExistingSchedule={}",
+                    objectResources.id, objectResources.minResourcePercent, objectResources.avgResourcePercent,
+                    existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+        }
+
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+            if (diff > 0.0) {
+                return -1;
+            } else if (diff < 0.0) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new TreeSet<>(comparator);
+        sortedObjectResources.addAll(allResources.getObjectResources());
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Nodes are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled is already on the node in
+     * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same node as the
+     * existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a node in descending
+     * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of
+     * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this
+     * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of
+     * one resource but a low amount of another.
+     *
+     * @param availHosts a collection of all the hosts we want to sort
+     * @param rackId     the rack id availNodes are a part of
+     * @return an iterable of sorted hosts.
+     */
+    private Iterable<ObjectResourcesItem> sortHosts(
+            Collection<String> availHosts, ExecutorDetails exec, String rackId,
+            Map<String, AtomicInteger> scheduledCount) {
+        ObjectResourcesSummary rackResourcesSummary = new ObjectResourcesSummary("RACK");
+        availHosts.forEach(h -> {
+            ObjectResourcesItem hostItem = new ObjectResourcesItem(h);
+            for (RasNode x : hostnameToNodes.get(h)) {
+                hostItem.add(new ObjectResourcesItem(x.getId(), x.getTotalAvailableResources(), x.getTotalResources(), 0, 0));
+            }
+            rackResourcesSummary.addObjectResourcesItem(hostItem);
+        });
+
+        LOG.debug(
+                "Rack {}: Overall Avail [ {} ] Total [ {} ]",
+                rackId,
+                rackResourcesSummary.getAvailableResourcesOverall(),
+                rackResourcesSummary.getTotalResourcesOverall());
+
+        return sortObjectResources(
+            rackResourcesSummary,
+            exec,
+            (hostId) -> {
+                AtomicInteger count = scheduledCount.get(hostId);
+                if (count == null) {
+                    return 0;
+                }
+                return count.get();
+            });
+    }
+
+    /**
+     * Nodes are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled is already on the node in
+     * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same node as the
+     * existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a node in descending
+     * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of
+     * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this
+     * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of
+     * one resource but a low amount of another.
+     *
+     * @param availRasNodes a list of all the nodes we want to sort
+     * @param hostId     the host-id that availNodes are a part of
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem} for nodes.
+     */
+    private Iterable<ObjectResourcesItem> sortNodes(
+            List<RasNode> availRasNodes, ExecutorDetails exec, String hostId,
+            Map<String, AtomicInteger> scheduledCount) {
+        ObjectResourcesSummary hostResourcesSummary = new ObjectResourcesSummary("HOST");
+        availRasNodes.forEach(x ->
+                hostResourcesSummary.addObjectResourcesItem(
+                        new ObjectResourcesItem(x.getId(), x.getTotalAvailableResources(), x.getTotalResources(), 0, 0)
+                )
+        );
+
+        LOG.debug(
+            "Host {}: Overall Avail [ {} ] Total [ {} ]",
+            hostId,
+            hostResourcesSummary.getAvailableResourcesOverall(),
+            hostResourcesSummary.getTotalResourcesOverall());
+
+        return sortObjectResources(
+            hostResourcesSummary,
+            exec,
+            (superId) -> {
+                AtomicInteger count = scheduledCount.get(superId);
+                if (count == null) {
+                    return 0;
+                }
+                return count.get();
+            });
+    }
+
+    protected List<String> makeHostToNodeIds(List<String> hosts) {
+        if (hosts == null) {
+            return Collections.emptyList();
+        }
+        List<String> ret = new ArrayList<>(hosts.size());
+        for (String host: hosts) {
+            List<RasNode> nodes = hostnameToNodes.get(host);
+            if (nodes != null) {
+                for (RasNode node : nodes) {
+                    ret.add(node.getId());
+                }
+            }
+        }
+        return ret;
+    }
+
+    private class LazyNodeSortingIterator implements Iterator<String> {
+        private final LazyNodeSorting parent;
+        private final Iterator<ObjectResourcesItem> rackIterator;
+        private Iterator<ObjectResourcesItem> hostIterator;
+        private Iterator<ObjectResourcesItem> nodeIterator;
+        private String nextValueFromNode = null;
+        private final Iterator<String> pre;
+        private final Iterator<String> post;
+        private final Set<String> skip;
+
+        LazyNodeSortingIterator(LazyNodeSorting parent, Iterable<ObjectResourcesItem> sortedRacks) {
+            this.parent = parent;
+            rackIterator = sortedRacks.iterator();
+            pre = favoredNodeIds.iterator();
+            post = Stream.concat(unFavoredNodeIds.stream(), greyListedSupervisorIds.stream())
+                            .collect(Collectors.toList())
+                            .iterator();
+            skip = parent.skippedNodeIds;
+        }
+
+        private Iterator<ObjectResourcesItem> getNodeIterator() {
+            if (nodeIterator != null && nodeIterator.hasNext()) {
+                return nodeIterator;
+            }
+            //need to get the next host/node iterator
+            if (hostIterator != null && hostIterator.hasNext()) {
+                ObjectResourcesItem host = hostIterator.next();
+                final String hostId = host.id;
+                nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+                return nodeIterator;
+            }
+            if (rackIterator.hasNext()) {
+                ObjectResourcesItem rack = rackIterator.next();
+                final String rackId = rack.id;
+                hostIterator = parent.getSortedHostsForRack(rackId).iterator();
+                ObjectResourcesItem host = hostIterator.next();
+                final String hostId = host.id;
+                nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+                return nodeIterator;
+            }
+
+            return null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (pre.hasNext()) {
+                return true;
+            }
+            if (nextValueFromNode != null) {
+                return true;
+            }
+            while (true) {
+                //For the node we don't know if we have another one unless we look at the contents
+                Iterator<ObjectResourcesItem> nodeIterator = getNodeIterator();
+                if (nodeIterator == null || !nodeIterator.hasNext()) {
+                    break;
+                }
+                String tmp = nodeIterator.next().id;
+                if (!skip.contains(tmp)) {
+                    nextValueFromNode = tmp;
+                    return true;
+                }
+            }
+            return post.hasNext();
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            if (pre.hasNext()) {
+                return pre.next();
+            }
+            if (nextValueFromNode != null) {
+                String tmp = nextValueFromNode;
+                nextValueFromNode = null;
+                return tmp;
+            }
+            return post.next();
+        }
+    }
+
+    private class LazyNodeSorting implements Iterable<String> {
+        private final Map<String, AtomicInteger> perHostScheduledCount = new HashMap<>();
+        private final Map<String, AtomicInteger> perNodeScheduledCount = new HashMap<>();
+        private final Iterable<ObjectResourcesItem> sortedRacks;
+        private final Map<String, Iterable<ObjectResourcesItem>> cachedHosts = new HashMap<>();
+        private final Map<String, Iterable<ObjectResourcesItem>> cachedNodesByHost = new HashMap<>();
+        private final ExecutorDetails exec;
+        private final Set<String> skippedNodeIds = new HashSet<>();
+
+        LazyNodeSorting(ExecutorDetails exec) {
+            this.exec = exec;
+            skippedNodeIds.addAll(favoredNodeIds);
+            skippedNodeIds.addAll(unFavoredNodeIds);
+            skippedNodeIds.addAll(greyListedSupervisorIds);
+
+            String topoId = topologyDetails.getId();
+            SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+            if (assignment != null) {
+                for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+                    assignment.getSlotToExecutors().entrySet()) {
+                    String superId = entry.getKey().getNodeId();
+                    String hostId = nodeIdToHostname.get(superId);
+                    perHostScheduledCount.computeIfAbsent(hostId, id -> new AtomicInteger(0))
+                        .getAndAdd(entry.getValue().size());
+                    perNodeScheduledCount.computeIfAbsent(superId, id -> new AtomicInteger(0))
+                        .getAndAdd(entry.getValue().size());
+                }
+            }
+            sortedRacks = getSortedRacks();
+        }
+
+        private Iterable<ObjectResourcesItem> getSortedHostsForRack(String rackId) {
+            return cachedHosts.computeIfAbsent(rackId,
+                id -> sortHosts(rackIdToHosts.getOrDefault(id, Collections.emptySet()), exec, id, perHostScheduledCount));
+        }
+
+        private Iterable<ObjectResourcesItem> getSortedNodesForHost(String hostId) {
+            return cachedNodesByHost.computeIfAbsent(hostId,
+                id -> sortNodes(hostnameToNodes.getOrDefault(id, Collections.emptyList()), exec, id, perNodeScheduledCount));
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return new LazyNodeSortingIterator(this, sortedRacks);
+        }
+    }
+
+    @Override
+    public Iterable<String> sortAllNodes() {
+        return new LazyNodeSorting(exec);
+    }
+
+    private ObjectResourcesSummary createClusterSummarizedResources() {
+        ObjectResourcesSummary clusterResourcesSummary = new ObjectResourcesSummary("Cluster");
+        rackIdToHosts.forEach((rackId, hostIds) -> {
+            if (hostIds == null || hostIds.isEmpty()) {
+                LOG.info("Ignoring Rack {} since it has no hosts", rackId);
+            } else {
+                ObjectResourcesItem rack = new ObjectResourcesItem(rackId);
+                for (String hostId : hostIds) {
+                    for (RasNode node : hostnameToNodes(hostId)) {
+                        rack.availableResources.add(node.getTotalAvailableResources());
+                        rack.totalResources.add(node.getTotalResources());
+                    }
+                }
+                clusterResourcesSummary.addObjectResourcesItem(rack);
+            }
+        });
+
+        LOG.debug(
+            "Cluster Overall Avail [ {} ] Total [ {} ], rackCnt={}, hostCnt={}",
+            clusterResourcesSummary.getAvailableResourcesOverall(),
+            clusterResourcesSummary.getTotalResourcesOverall(),
+            clusterResourcesSummary.getObjectResources().size(),
+            rackIdToHosts.values().stream().mapToInt(x -> x.size()).sum());
+        return clusterResourcesSummary;
+    }
+
+    public Map<String, AtomicInteger> getScheduledExecCntByRackId() {
+        SchedulerAssignment assignment = cluster.getAssignmentById(topologyDetails.getId());
+        Map<String, AtomicInteger> scheduledCount = new HashMap<>();
+        if (assignment != null) {
+            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+                assignment.getSlotToExecutors().entrySet()) {
+                String superId = entry.getKey().getNodeId();
+                String rackId = superIdToRack.get(superId);
+                scheduledCount.computeIfAbsent(rackId, (rid) -> new AtomicInteger(0))
+                    .getAndAdd(entry.getValue().size());
+            }
+        }
+        return scheduledCount;
+    }
+
+    /**
+     * Racks are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled is already on the rack in descending order.
+     * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the existing executors of the
+     * topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a rack in descending order We calculate
+     * the resource availability percentage by dividing the resource availability on the rack by the resource availability of the  entire
+     * cluster By doing this calculation, racks that have exhausted or little of one of the resources mentioned above will be ranked after
+     * racks that have more balanced resource availability. So we will be less likely to pick a rack that have a lot of one resource but a
+     * low amount of another.
+     *
+     * @return an iterable of sorted racks
+     */
+    public Iterable<ObjectResourcesItem> getSortedRacks() {
+
+        final ObjectResourcesSummary clusterResourcesSummary = createClusterSummarizedResources();
+        final Map<String, AtomicInteger> scheduledCount = getScheduledExecCntByRackId();
+
+        return sortObjectResources(
+            clusterResourcesSummary,
+            exec,
+            (rackId) -> {
+                AtomicInteger count = scheduledCount.get(rackId);
+                if (count == null) {
+                    return 0;
+                }
+                return count.get();
+            });
+    }
+
+    /**
+     * hostname to Ids.
+     *
+     * @param hostname the hostname.
+     * @return the ids n that node.
+     */
+    public List<RasNode> hostnameToNodes(String hostname) {
+        return hostnameToNodes.getOrDefault(hostname, Collections.emptyList());
+    }
+
+    /**
+     * interface for calculating the number of existing executors scheduled on a object (rack or node).
+     */
+    public interface ExistingScheduleFunc {
+        int getNumExistingSchedule(String objectId);
+    }
+}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 11de468..dcffa99 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -189,13 +189,34 @@
         }
     }
 
-    public static Map<String, SupervisorDetails> genSupervisorsWithRacks(int numRacks, int numSupersPerRack, int numPorts, int rackStart,
-                                                                         int superInRackStart, double cpu, double mem,
-                                                                         Map<String, Double> miscResources) {
-        Map<String, Double> resourceMap = new HashMap<>();
-        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
-        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
-        resourceMap.putAll(miscResources);
+    public static Map<String, SupervisorDetails> genSupervisorsWithRacks(
+            int numRacks, int numSupersPerRack, int numPorts, int rackStart, int superInRackStart,
+            double cpu, double mem, Map<String, Double> miscResources) {
+
+        return genSupervisorsWithRacksAndNuma(numRacks, numSupersPerRack, 1, numPorts, rackStart,
+                superInRackStart, cpu, mem, miscResources, 1.0);
+    }
+
+    /**
+     * Takes one additional parameter numaZonesPerHost. This parameter determines how many supervisors
+     * will be created on the same host. If numaResourceMultiplier is set to a factor below 1.0, then
+     * each subsequent numa zone will have corresponding lower cpu/mem than previous numa zone.
+     *
+     * @param numRacks
+     * @param numSupersPerRack
+     * @param numaZonesPerHost
+     * @param numPorts
+     * @param rackStart
+     * @param superInRackStart
+     * @param cpu
+     * @param mem
+     * @param miscResources
+     * @param numaResourceMultiplier - cpu/mem resource for each numaZone is multiplied by this factor to obtain uneven resources
+     * @return
+     */
+    public static Map<String, SupervisorDetails> genSupervisorsWithRacksAndNuma(
+            int numRacks, int numSupersPerRack, int numaZonesPerHost, int numPorts, int rackStart, int superInRackStart,
+            double cpu, double mem, Map<String, Double> miscResources, double numaResourceMultiplier) {
         Map<String, SupervisorDetails> retList = new HashMap<>();
         for (int rack = rackStart; rack < numRacks + rackStart; rack++) {
             for (int superInRack = superInRackStart; superInRack < (numSupersPerRack + superInRackStart); superInRack++) {
@@ -203,8 +224,23 @@
                 for (int p = 0; p < numPorts; p++) {
                     ports.add(p);
                 }
-                SupervisorDetails sup = new SupervisorDetails(String.format("r%03ds%03d", rack, superInRack),
-                    String.format("host-%03d-rack-%03d", superInRack, rack), null, ports,
+                String superId;
+                String host;
+                int numaZone = superInRack % numaZonesPerHost;
+                if (numaZonesPerHost > 1) {
+                    // multiple supervisors per host
+                    int hostInRack = superInRack / numaZonesPerHost;
+                    superId = String.format("r%03ds%03dn%d", rack, superInRack, numaZone);
+                    host = String.format("host-%03d-rack-%03d", hostInRack, rack);
+                } else {
+                    superId = String.format("r%03ds%03d", rack, superInRack);
+                    host = String.format("host-%03d-rack-%03d", superInRack, rack);
+                }
+                Map<String, Double> resourceMap = new HashMap<>();
+                resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu * Math.pow(numaResourceMultiplier, numaZone));
+                resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem * Math.pow(numaResourceMultiplier, numaZone));
+                resourceMap.putAll(miscResources);
+                SupervisorDetails sup = new SupervisorDetails(superId, host, null, ports,
                     NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
                 retList.put(sup.getId(), sup);
 
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index 96a0f9a..fce3065 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -22,6 +22,9 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
@@ -36,6 +39,7 @@
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByConstraintSeverity;
 import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.json.simple.JSONValue;
@@ -77,6 +81,12 @@
 
     public TestConstraintSolverStrategy(boolean consolidatedConfigFlag) {
         this.consolidatedConfigFlag = consolidatedConfigFlag;
+        List<Class> classesToDebug = Arrays.asList(TestConstraintSolverStrategy.class,
+                BaseResourceAwareStrategy.class, ResourceAwareScheduler.class,
+                NodeSorterHostProximity.class, Cluster.class
+        );
+        Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO
+        classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel));
         LOG.info("Running tests with consolidatedConfigFlag={}", consolidatedConfigFlag);
     }
 
@@ -180,15 +190,15 @@
         return makeTestTopoConf(1);
     }
 
-    public TopologyDetails makeTopology(Map<String, Object> config, int boltParallel) {
+    public static TopologyDetails makeTopology(Map<String, Object> config, int boltParallel) {
         return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0, "user");
     }
 
-    public Cluster makeCluster(Topologies topologies) {
+    public static Cluster makeCluster(Topologies topologies) {
         return makeCluster(topologies, null);
     }
 
-    public Cluster makeCluster(Topologies topologies, Map<String, SupervisorDetails> supMap) {
+    public static Cluster makeCluster(Topologies topologies, Map<String, SupervisorDetails> supMap) {
         if (supMap == null) {
             supMap = genSupervisors(4, 2, 120, 1200);
         }
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 1b1c035..b6f2a20 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -44,6 +44,7 @@
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
 import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
 import org.apache.storm.topology.SharedOffHeapWithinNode;
 import org.apache.storm.topology.SharedOffHeapWithinWorker;
 import org.apache.storm.topology.SharedOnHeap;
@@ -74,7 +75,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
+
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 
@@ -373,6 +375,15 @@
         SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
         TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
         long numNodes = assignment.getSlotToExecutors().keySet().stream().map(WorkerSlot::getNodeId).distinct().count();
+        String assignmentString = "Assignments:\n\t" + assignment.getSlotToExecutors().entrySet().stream()
+                .map(x -> String.format("Node=%s, components=%s",
+                        x.getKey().getNodeId(),
+                        x.getValue().stream()
+                                .map(y -> topo.getComponentFromExecutor(y))
+                                .collect(Collectors.joining(","))
+                        )
+                )
+                .collect(Collectors.joining("\n\t"));
 
         if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_NONE) {
             // Everything should fit in a single slot
@@ -411,7 +422,7 @@
             int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
             assertThat(numAssignedWorkers, is(8));
             assertThat(assignment.getSlots().size(), is(8));
-            assertThat(numNodes, is(2L));
+            assertThat(assignmentString, numNodes, is(2L));
         } else if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_ONE_COMPONENT) {
             double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
             double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker + sharedOffHeapWithinNode;
@@ -766,22 +777,17 @@
         for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
             String hostName = entry.getKey();
             String rack = entry.getValue();
-            List<String> nodesForRack = rackToNodes.get(rack);
-            if (nodesForRack == null) {
-                nodesForRack = new ArrayList<>();
-                rackToNodes.put(rack, nodesForRack);
-            }
-            nodesForRack.add(hostName);
+            rackToNodes.computeIfAbsent(rack, rid -> new ArrayList<>()).add(hostName);
         }
         cluster.setNetworkTopography(rackToNodes);
 
         DefaultResourceAwareStrategyOld rs = new DefaultResourceAwareStrategyOld();
         
         rs.prepareForScheduling(cluster, topo1);
-        INodeSorter nodeSorter = new NodeSorter(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
-        TreeSet<ObjectResourcesItem> sortedRacks = nodeSorter.sortRacks(null);
+        INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+        nodeSorter.prepare(null);
+        Iterable<ObjectResourcesItem> sortedRacks = nodeSorter.getSortedRacks();
 
-        Assert.assertEquals("# of racks sorted", 6, sortedRacks.size());
         Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
         // Ranked first since rack-0 has the most balanced set of resources
         Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id);
@@ -904,10 +910,10 @@
         DefaultResourceAwareStrategyOld rs = new DefaultResourceAwareStrategyOld();
 
         rs.prepareForScheduling(cluster, topo1);
-        INodeSorter nodeSorter = new NodeSorter(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
-        TreeSet<ObjectResourcesItem> sortedRacks= nodeSorter.sortRacks(null);
+        INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+        nodeSorter.prepare(null);
+        Iterable<ObjectResourcesItem> sortedRacks= nodeSorter.getSortedRacks();
 
-        Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
         Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
         // Ranked first since rack-0 has the most balanced set of resources
         Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
new file mode 100644
index 0000000..84b839f
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
@@ -0,0 +1,1036 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNodes;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@ExtendWith({NormalizedResourcesExtension.class})
+public class TestNodeSorterHostProximity {
+    private static final Logger LOG = LoggerFactory.getLogger(TestNodeSorterHostProximity.class);
+    private static final int CURRENT_TIME = 1450418597;
+
+    protected Class getDefaultResourceAwareStrategyClass() {
+        return DefaultResourceAwareStrategy.class;
+    }
+
+    private Config createClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
+                                       Map<String, Map<String, Number>> pools) {
+        Config config = TestUtilsForResourceAwareScheduler.createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, getDefaultResourceAwareStrategyClass().getName());
+        return config;
+    }
+
+    private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
+        private final Map<String, String> hostToRackMap;
+        private final Map<String, List<String>> rackToHosts;
+
+        @SafeVarargs
+        public TestDNSToSwitchMapping(Map<String, SupervisorDetails>... racks) {
+            Set<String> seenHosts = new HashSet<>();
+            Map<String, String> hostToRackMap = new HashMap<>();
+            Map<String, List<String>> rackToHosts = new HashMap<>();
+            for (int rackNum = 0; rackNum < racks.length; rackNum++) {
+                String rack = String.format("rack-%03d", rackNum);
+                for (SupervisorDetails sup : racks[rackNum].values()) {
+                    hostToRackMap.put(sup.getHost(), rack);
+                    String host = sup.getHost();
+                    if (!seenHosts.contains(host)) {
+                        rackToHosts.computeIfAbsent(rack, rid -> new ArrayList<>()).add(host);
+                        seenHosts.add(host);
+                    }
+                }
+            }
+            this.hostToRackMap = Collections.unmodifiableMap(hostToRackMap);
+            this.rackToHosts = Collections.unmodifiableMap(rackToHosts);
+        }
+
+        /**
+         * Use the "rack-%03d" embedded in the name of the supervisor to determine the rack number.
+         *
+         * @param supervisorDetailsCollection
+         */
+        public TestDNSToSwitchMapping(Collection<SupervisorDetails> supervisorDetailsCollection) {
+            Set<String> seenHosts = new HashSet<>();
+            Map<String, String> hostToRackMap = new HashMap<>();
+            Map<String, List<String>> rackToHosts = new HashMap<>();
+
+            for (SupervisorDetails supervisorDetails: supervisorDetailsCollection) {
+                String rackId = supervisorIdToRackName(supervisorDetails.getId());
+                hostToRackMap.put(supervisorDetails.getHost(), rackId);
+                String host = supervisorDetails.getHost();
+                if (!seenHosts.contains(host)) {
+                    rackToHosts.computeIfAbsent(rackId, rid -> new ArrayList<>()).add(host);
+                    seenHosts.add(host);
+                }
+            }
+            this.hostToRackMap = Collections.unmodifiableMap(hostToRackMap);
+            this.rackToHosts = Collections.unmodifiableMap(rackToHosts);
+        }
+
+        @Override
+        public Map<String, String> resolve(List<String> names) {
+            return hostToRackMap;
+        }
+
+        public Map<String, List<String>> getRackToHosts() {
+            return rackToHosts;
+        }
+    }
+
+    /**
+     * Test whether strategy will choose correct rack.
+     */
+    @Test
+    public void testMultipleRacks() {
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
+        final int numRacks = 1;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 4;
+        final int numZonesPerHost = 1;
+        final double numaResourceMultiplier = 1.0;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+
+        final Map<String, SupervisorDetails> supMapRack0 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                400, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate another rack of supervisors with less resources
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack1 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                200, 4000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate some supervisors that are depleted of one resource
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack2 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                0, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate some that has a lot of memory but little of cpu
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack3 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                10, 8000 * 2 + 4000, Collections.emptyMap(),numaResourceMultiplier);
+
+        //generate some that has a lot of cpu but little of memory
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack4 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                400 + 200 + 10, 1000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //Generate some that have neither resource, to verify that the strategy will prioritize this last
+        //Also put a generic resource with 0 value in the resources list, to verify that it doesn't affect the sorting
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack5 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                0.0, 0.0, Collections.singletonMap("gpu.count", 0.0), numaResourceMultiplier);
+
+        supMap.putAll(supMapRack0);
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+        supMap.putAll(supMapRack5);
+
+        Config config = createClusterConfig(100, 500, 500, null);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new INimbusTest();
+
+        //create test DNSToSwitchMapping plugin
+        TestDNSToSwitchMapping testDNSToSwitchMapping =
+                new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4, supMapRack5);
+
+        //generate topologies
+        TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+        TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+        Topologies topologies = new Topologies(topo1, topo2);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        List<String> supHostnames = new LinkedList<>();
+        for (SupervisorDetails sup : supMap.values()) {
+            supHostnames.add(sup.getHost());
+        }
+        Map<String, List<String>> rackToHosts = testDNSToSwitchMapping.getRackToHosts();
+        cluster.setNetworkTopography(rackToHosts);
+
+        NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+        nodeSorter.prepare(null);
+        List<ObjectResourcesItem> sortedRacks = StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+                .collect(Collectors.toList());
+        String rackSummaries = sortedRacks.stream()
+                .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+                        x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+                        x.minResourcePercent, x.avgResourcePercent,
+                        x.availableResources.getTotalCpu(),
+                        x.availableResources.getTotalMemoryMb()))
+                .collect(Collectors.joining("\n\t"));
+        Assert.assertEquals(rackSummaries + "\n# of racks sorted", 6, sortedRacks.size());
+        Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+        Assert.assertEquals(rackSummaries + "\nrack-000 should be ordered first since it has the most balanced set of resources", "rack-000", it.next().id);
+        Assert.assertEquals(rackSummaries + "\nrack-001 should be ordered second since it has a balanced set of resources but less than rack-000", "rack-001", it.next().id);
+        Assert.assertEquals(rackSummaries + "\nrack-004 should be ordered third since it has a lot of cpu but not a lot of memory", "rack-004", it.next().id);
+        Assert.assertEquals(rackSummaries + "\nrack-003 should be ordered fourth since it has a lot of memory but not cpu", "rack-003", it.next().id);
+        Assert.assertEquals(rackSummaries + "\nrack-002 should be ordered fifth since it has not cpu resources", "rack-002", it.next().id);
+        Assert.assertEquals(rackSummaries + "\nRack-005 should be ordered sixth since it has neither CPU nor memory available", "rack-005", it.next().id);
+    }
+
+    /**
+     * Test whether strategy will choose correct rack.
+     */
+    @Test
+    public void testMultipleRacksWithFavoritism() {
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
+        final int numRacks = 1;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 4;
+        final int numZonesPerHost = 2;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+        final Map<String, SupervisorDetails> supMapRack0 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                400, 8000, Collections.emptyMap(), 1.0);
+
+        //generate another rack of supervisors with less resources
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack1 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                200, 4000, Collections.emptyMap(), 1.0);
+
+        //generate some supervisors that are depleted of one resource
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack2 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                0, 8000, Collections.emptyMap(), 1.0);
+
+        //generate some that has a lot of memory but little of cpu
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack3 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                10, 8000 * 2 + 4000, Collections.emptyMap(), 1.0);
+
+        //generate some that has a lot of cpu but little of memory
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack4 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                400 + 200 + 10, 1000, Collections.emptyMap(), 1.0);
+
+        supMap.putAll(supMapRack0);
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+
+        Config config = createClusterConfig(100, 500, 500, null);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new INimbusTest();
+
+        //create test DNSToSwitchMapping plugin
+        TestDNSToSwitchMapping testDNSToSwitchMapping =
+                new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4);
+
+        Config t1Conf = new Config();
+        t1Conf.putAll(config);
+        final List<String> t1FavoredHostNames = Arrays.asList("host-41", "host-42", "host-43");
+        t1Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, t1FavoredHostNames);
+        final List<String> t1UnfavoredHostIds = Arrays.asList("host-1", "host-2", "host-3");
+        t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, t1UnfavoredHostIds);
+        //generate topologies
+        TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+
+        Config t2Conf = new Config();
+        t2Conf.putAll(config);
+        t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, Arrays.asList("host-31", "host-32", "host-33"));
+        t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, Arrays.asList("host-11", "host-12", "host-13"));
+        TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+        Topologies topologies = new Topologies(topo1, topo2);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        List<String> supHostnames = new LinkedList<>();
+        for (SupervisorDetails sup : supMap.values()) {
+            supHostnames.add(sup.getHost());
+        }
+        Map<String, List<String>> rackToHosts = testDNSToSwitchMapping.getRackToHosts();
+        cluster.setNetworkTopography(rackToHosts);
+
+        NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+        nodeSorter.prepare(null);
+        List<ObjectResourcesItem> sortedRacks = StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+                .collect(Collectors.toList());
+        String rackSummaries = sortedRacks.stream()
+                .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+                        x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+                        x.minResourcePercent, x.avgResourcePercent,
+                        x.availableResources.getTotalCpu(),
+                        x.availableResources.getTotalMemoryMb()))
+                .collect(Collectors.joining("\n\t"));
+
+        Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+        // Ranked first since rack-000 has the most balanced set of resources
+        Assert.assertEquals("rack-000 should be ordered first", "rack-000", it.next().id);
+        // Ranked second since rack-1 has a balanced set of resources but less than rack-0
+        Assert.assertEquals("rack-001 should be ordered second", "rack-001", it.next().id);
+        // Ranked third since rack-4 has a lot of cpu but not a lot of memory
+        Assert.assertEquals("rack-004 should be ordered third", "rack-004", it.next().id);
+        // Ranked fourth since rack-3 has alot of memory but not cpu
+        Assert.assertEquals("rack-003 should be ordered fourth", "rack-003", it.next().id);
+        //Ranked last since rack-2 has not cpu resources
+        Assert.assertEquals("rack-00s2 should be ordered fifth", "rack-002", it.next().id);
+    }
+
+    /**
+     * Test if hosts are presented together regardless of resource availability.
+     * Supervisors are created with multiple Numa zones in such a manner that resources on two numa zones on the same host
+     * differ widely in resource availability.
+     */
+    @Test
+    public void testMultipleRacksWithHostProximity() {
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
+        final int numRacks = 1;
+        final int numSupersPerRack = 12;
+        final int numPortsPerSuper = 4;
+        final int numZonesPerHost = 3;
+        final double numaResourceMultiplier = 0.4;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+
+        final Map<String, SupervisorDetails> supMapRack0 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                400, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate another rack of supervisors with less resources
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack1 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                200, 4000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate some supervisors that are depleted of one resource
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack2 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                0, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate some that has a lot of memory but little of cpu
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack3 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                10, 8000 * 2 + 4000, Collections.emptyMap(),numaResourceMultiplier);
+
+        //generate some that has a lot of cpu but little of memory
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack4 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                400 + 200 + 10, 1000, Collections.emptyMap(), numaResourceMultiplier);
+
+        supMap.putAll(supMapRack0);
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+
+        Config config = createClusterConfig(100, 500, 500, null);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new INimbusTest();
+
+        //create test DNSToSwitchMapping plugin
+        TestDNSToSwitchMapping testDNSToSwitchMapping =
+                new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4);
+
+        Config t1Conf = new Config();
+        t1Conf.putAll(config);
+        final List<String> t1FavoredHostNames = Arrays.asList("host-41", "host-42", "host-43");
+        t1Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, t1FavoredHostNames);
+        final List<String> t1UnfavoredHostIds = Arrays.asList("host-1", "host-2", "host-3");
+        t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, t1UnfavoredHostIds);
+        //generate topologies
+        TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+
+        Config t2Conf = new Config();
+        t2Conf.putAll(config);
+        t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, Arrays.asList("host-31", "host-32", "host-33"));
+        t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, Arrays.asList("host-11", "host-12", "host-13"));
+        TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+        Topologies topologies = new Topologies(topo1, topo2);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1);
+        nodeSorter.prepare(null);
+
+        Set<String> seenHosts = new HashSet<>();
+        String prevHost = null;
+        List<String> errLines = new ArrayList();
+        Map<String, String> nodeToHost = new RasNodes(cluster).getNodeIdToHostname();
+        for (String nodeId: nodeSorter.sortAllNodes()) {
+            String host = nodeToHost.getOrDefault(nodeId, "no-host-for-node-" + nodeId);
+            errLines.add(String.format("\tnodeId:%s, host:%s", nodeId, host));
+            if (!host.equals(prevHost) && seenHosts.contains(host)) {
+                String err = String.format("Host %s for node %s is out of order:\n\t%s", host, nodeId, String.join("\n\t", errLines));
+                Assert.fail(err);
+            }
+            seenHosts.add(host);
+            prevHost = host;
+        }
+    }
+
+    /**
+     * Racks should be returned in order of decreasing capacity.
+     */
+    @Test
+    public void testMultipleRacksOrderedByCapacity() {
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
+        final int numRacks = 1;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 4;
+        final int numZonesPerHost = 1;
+        final double numaResourceMultiplier = 1.0;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+
+        final Map<String, SupervisorDetails> supMapRack0 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                600, 8000 - rackStartNum, Collections.emptyMap(), numaResourceMultiplier);
+
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack1 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                500, 8000 - rackStartNum, Collections.emptyMap(), numaResourceMultiplier);
+
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack2 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                400, 8000 - rackStartNum, Collections.emptyMap(), numaResourceMultiplier);
+
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack3 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                300, 8000 - rackStartNum, Collections.emptyMap(),numaResourceMultiplier);
+
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack4 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                200, 8000 - rackStartNum, Collections.emptyMap(), numaResourceMultiplier);
+
+        // too small to hold topology
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack5 = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+                100, 8000 - rackStartNum, Collections.singletonMap("gpu.count", 0.0), numaResourceMultiplier);
+
+        supMap.putAll(supMapRack0);
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+        supMap.putAll(supMapRack5);
+
+        Config config = createClusterConfig(100, 500, 500, null);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new INimbusTest();
+
+        //create test DNSToSwitchMapping plugin
+        TestDNSToSwitchMapping testDNSToSwitchMapping =
+                new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4, supMapRack5);
+
+        //generate topologies
+        TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+        TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+        Topologies topologies = new Topologies(topo1, topo2);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, topo1);
+        nodeSorter.prepare(null);
+        List<ObjectResourcesItem> sortedRacks = StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+                .collect(Collectors.toList());
+        String rackSummaries = sortedRacks
+                .stream()
+                .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+                        x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+                        x.minResourcePercent, x.avgResourcePercent,
+                        x.availableResources.getTotalCpu(),
+                        x.availableResources.getTotalMemoryMb()))
+                .collect(Collectors.joining("\n\t"));
+        NormalizedResourceRequest topoResourceRequest = topo1.getApproximateTotalResources();
+        String topoRequest = String.format("Topo %s, approx-requested-resources %s", topo1.getId(), topoResourceRequest.toString());
+        Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nRack-000 should be ordered first since it has the largest capacity", "rack-000", it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-001 should be ordered second since it smaller than rack-000", "rack-001", it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-002 should be ordered third since it is smaller than rack-001", "rack-002", it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-003 should be ordered fourth since it since it is smaller than rack-002", "rack-003", it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-004 should be ordered fifth since it since it is smaller than rack-003", "rack-004", it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-005 should be ordered last since it since it is has smallest capacity", "rack-005", it.next().id);
+    }
+
+    /**
+     * Schedule two topologies, once with special resources and another without.
+     * There are enough special resources to hold one topology with special resource ("my.gpu").
+     * If the sort order is incorrect, scheduling will not succeed.
+     */
+    @Test
+    public void testAntiAffinityWithMultipleTopologies() {
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(1, 40, 66, 0, 0, 4700, 226200, new HashMap<>());
+        HashMap<String, Double> extraResources = new HashMap<>();
+        extraResources.put("my.gpu", 1.0);
+        supMap.putAll(genSupervisorsWithRacks(1, 40, 66, 1, 0, 4700, 226200, extraResources));
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(88, 775, 25, null, null));
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails tdSimple = genTopology("topology-simple", config, 1,
+                5, 100, 300, 0, 0, "user", 8192);
+
+        //Schedule the simple topology first
+        Topologies topologies = new Topologies(tdSimple);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        {
+            NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, tdSimple);
+            for (ExecutorDetails exec : tdSimple.getExecutors()) {
+                nodeSorter.prepare(exec);
+                List<ObjectResourcesItem> sortedRacks = StreamSupport
+                        .stream(nodeSorter.getSortedRacks().spliterator(), false)
+                        .collect(Collectors.toList());
+                String rackSummaries = StreamSupport
+                        .stream(sortedRacks.spliterator(), false)
+                        .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+                                x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+                                x.minResourcePercent, x.avgResourcePercent,
+                                x.availableResources.getTotalCpu(),
+                                x.availableResources.getTotalMemoryMb()))
+                        .collect(Collectors.joining("\n\t"));
+                NormalizedResourceRequest topoResourceRequest = tdSimple.getApproximateTotalResources();
+                String topoRequest = String.format("Topo %s, approx-requested-resources %s", tdSimple.getId(), topoResourceRequest.toString());
+                Assert.assertEquals(rackSummaries + "\n# of racks sorted", 2, sortedRacks.size());
+                Assert.assertEquals(rackSummaries + "\nFirst rack sorted", "rack-000", sortedRacks.get(0).id);
+                Assert.assertEquals(rackSummaries + "\nSecond rack sorted", "rack-001", sortedRacks.get(1).id);
+            }
+        }
+
+        scheduler.schedule(topologies, cluster);
+
+        TopologyBuilder builder = topologyBuilder(1, 5, 100, 300);
+        builder.setBolt("gpu-bolt", new TestBolt(), 40)
+                .addResource("my.gpu", 1.0)
+                .shuffleGrouping("spout-0");
+        TopologyDetails tdGpu = topoToTopologyDetails("topology-gpu", config, builder.createTopology(), 0, 0,"user", 8192);
+
+        //Now schedule GPU but with the simple topology in place.
+        topologies = new Topologies(tdSimple, tdGpu);
+        cluster = new Cluster(cluster, topologies);
+        {
+            NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, tdGpu);
+            for (ExecutorDetails exec : tdGpu.getExecutors()) {
+                String comp = tdGpu.getComponentFromExecutor(exec);
+                nodeSorter.prepare(exec);
+                List<ObjectResourcesItem> sortedRacks = StreamSupport
+                        .stream(nodeSorter.getSortedRacks().spliterator(), false).collect(Collectors.toList());
+                String rackSummaries = sortedRacks.stream()
+                        .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+                                x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+                                x.minResourcePercent, x.avgResourcePercent,
+                                x.availableResources.getTotalCpu(),
+                                x.availableResources.getTotalMemoryMb()))
+                        .collect(Collectors.joining("\n\t"));
+                NormalizedResourceRequest topoResourceRequest = tdSimple.getApproximateTotalResources();
+                String topoRequest = String.format("Topo %s, approx-requested-resources %s", tdSimple.getId(), topoResourceRequest.toString());
+                Assert.assertEquals(rackSummaries + "\n# of racks sorted", 2, sortedRacks.size());
+                if (comp.equals("gpu-bolt")) {
+                    Assert.assertEquals(rackSummaries + "\nFirst rack sorted for " + comp, "rack-001", sortedRacks.get(0).id);
+                    Assert.assertEquals(rackSummaries + "\nSecond rack sorted for " + comp, "rack-000", sortedRacks.get(1).id);
+                } else {
+                    Assert.assertEquals(rackSummaries + "\nFirst rack sorted for " + comp, "rack-000", sortedRacks.get(0).id);
+                    Assert.assertEquals(rackSummaries + "\nSecond rack sorted for " + comp, "rack-001", sortedRacks.get(1).id);
+                }
+            }
+        }
+
+        scheduler.schedule(topologies, cluster);
+
+        Map<String, SchedulerAssignment> assignments = new TreeMap<>(cluster.getAssignments());
+        assertEquals(2, assignments.size());
+
+        Map<String, Map<String, AtomicLong>> topoPerRackCount = new HashMap<>();
+        for (Map.Entry<String, SchedulerAssignment> entry: assignments.entrySet()) {
+            SchedulerAssignment sa = entry.getValue();
+            Map<String, AtomicLong> slotsPerRack = new TreeMap<>();
+            for (WorkerSlot slot : sa.getSlots()) {
+                String nodeId = slot.getNodeId();
+                String rack = supervisorIdToRackName(nodeId);
+                slotsPerRack.computeIfAbsent(rack, (r) -> new AtomicLong(0)).incrementAndGet();
+            }
+            LOG.info("{} => {}", entry.getKey(), slotsPerRack);
+            topoPerRackCount.put(entry.getKey(), slotsPerRack);
+        }
+
+        Map<String, AtomicLong> simpleCount = topoPerRackCount.get("topology-simple-0");
+        assertNotNull(simpleCount);
+        //Because the simple topology was scheduled first we want to be sure that it didn't put anything on
+        // the GPU nodes.
+        assertEquals(1, simpleCount.size()); //Only 1 rack is in use
+        assertFalse(simpleCount.containsKey("r001")); //r001 is the second rack with GPUs
+        assertTrue(simpleCount.containsKey("r000")); //r000 is the first rack with no GPUs
+
+        //We don't really care too much about the scheduling of topology-gpu-0, because it was scheduled.
+    }
+
+    /**
+     * Free one-fifth of WorkerSlots.
+     */
+    private void freeSomeWorkerSlots(Cluster cluster) {
+        Map<String, SchedulerAssignment> assignmentMap = cluster.getAssignments();
+        for (SchedulerAssignment schedulerAssignment: assignmentMap.values()) {
+            int i = 0;
+            List<WorkerSlot> slotsToKill = new ArrayList<>();
+            for (WorkerSlot workerSlot: schedulerAssignment.getSlots()) {
+                i++;
+                if (i % 5 == 0) {
+                    slotsToKill.add(workerSlot);
+                }
+            }
+            cluster.freeSlots(slotsToKill);
+        }
+    }
+
+    /**
+     * If the topology is too large for one rack, it should be partially scheduled onto the next rack (and next rack only).
+     */
+    @Test
+    public void testFillUpRackAndSpilloverToNextRack() {
+        INimbus iNimbus = new INimbusTest();
+        double compPcore = 100;
+        double compOnHeap = 775;
+        double compOffHeap = 25;
+        int topo1NumSpouts = 1;
+        int topo1NumBolts = 5;
+        int topo1SpoutParallelism = 100;
+        int topo1BoltParallelism = 200;
+        final int numRacks = 3;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 6;
+        final int numZonesPerHost = 1;
+        final double numaResourceMultiplier = 1.0;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+        long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + topo1NumBolts * topo1BoltParallelism) * 4/5; // not enough for topo1
+        long compPerSuper =  compPerRack / numSupersPerRack;
+        double cpuPerSuper = compPcore * compPerSuper;
+        double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+        double topo1MaxHeapSize = memPerSuper;
+        final String topoName1 = "topology1";
+
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum, supStartNum,
+                cpuPerSuper, memPerSuper, Collections.emptyMap(), numaResourceMultiplier);
+        TestDNSToSwitchMapping testDNSToSwitchMapping = new TestDNSToSwitchMapping(supMap.values());
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(compPcore, compOnHeap, compOffHeap, null, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+                topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, "user", topo1MaxHeapSize);
+
+        //Schedule the topo1 topology and ensure it fits on 2 racks
+        Topologies topologies = new Topologies(td1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        scheduler.schedule(topologies, cluster);
+        Set<String> assignedRacks = cluster.getAssignedRacks(td1.getId());
+        assertEquals("Racks for topology=" + td1.getId() + " is " + assignedRacks, 2, assignedRacks.size());
+    }
+
+    /**
+     * Rack with low resources should be used to schedule an executor if it has other executors for the same topology.
+     * <li>Schedule topo1 on one rack</li>
+     * <li>unassign some executors</li>
+     * <li>schedule another topology to partially fill up rack1</li>
+     * <li>Add another rack and schedule topology 1 remaining executors again</li>
+     * <li>scheduling should utilize all resources on rack1 before before trying next rack</li>
+     */
+    @Test
+    public void testPreferRackWithTopoExecutors() {
+        INimbus iNimbus = new INimbusTest();
+        double compPcore = 100;
+        double compOnHeap = 775;
+        double compOffHeap = 25;
+        int topo1NumSpouts = 1;
+        int topo1NumBolts = 5;
+        int topo1SpoutParallelism = 100;
+        int topo1BoltParallelism = 200;
+        int topo2NumSpouts = 1;
+        int topo2NumBolts = 5;
+        int topo2SpoutParallelism = 10;
+        int topo2BoltParallelism = 20;
+        final int numRacks = 3;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 6;
+        final int numZonesPerHost = 1;
+        final double numaResourceMultiplier = 1.0;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+        long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + topo1NumBolts * topo1BoltParallelism
+                + topo2NumSpouts * topo2SpoutParallelism); // enough for topo1 but not topo1+topo2
+        long compPerSuper =  compPerRack / numSupersPerRack;
+        double cpuPerSuper = compPcore * compPerSuper;
+        double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+        double topo1MaxHeapSize = memPerSuper;
+        double topo2MaxHeapSize = memPerSuper;
+        final String topoName1 = "topology1";
+        final String topoName2 = "topology2";
+
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum, supStartNum,
+                cpuPerSuper, memPerSuper, Collections.emptyMap(), numaResourceMultiplier);
+        TestDNSToSwitchMapping testDNSToSwitchMapping = new TestDNSToSwitchMapping(supMap.values());
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(compPcore, compOnHeap, compOffHeap, null, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+                topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, "user", topo1MaxHeapSize);
+
+        //Schedule the topo1 topology and ensure it fits on 1 rack
+        Topologies topologies = new Topologies(td1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        scheduler.schedule(topologies, cluster);
+        Set<String> assignedRacks = cluster.getAssignedRacks(td1.getId());
+        assertEquals("Racks for topology=" + td1.getId() + " is " + assignedRacks, 1, assignedRacks.size());
+
+        TopologyBuilder builder = topologyBuilder(topo2NumSpouts, topo2NumBolts, topo2SpoutParallelism, topo2BoltParallelism);
+        TopologyDetails td2 = topoToTopologyDetails(topoName2, config, builder.createTopology(), 0, 0,"user", topo2MaxHeapSize);
+
+        //Now schedule GPU but with the simple topology in place.
+        topologies = new Topologies(td1, td2);
+        cluster = new Cluster(cluster, topologies);
+        scheduler.schedule(topologies, cluster);
+
+        assignedRacks = cluster.getAssignedRacks(td1.getId(), td2.getId());
+        assertEquals("Racks for topologies=" + td1.getId() + "/" + td2.getId() + " is " + assignedRacks, 2, assignedRacks.size());
+
+        // topo2 gets scheduled on its own rack because it is empty and available
+        assignedRacks = cluster.getAssignedRacks(td2.getId());
+        assertEquals("Racks for topologies=" + td2.getId() + " is " + assignedRacks, 1, assignedRacks.size());
+
+        // now unassign topo2, expect only one rack to be in use; free some slots and reschedule topo1 some topo1 executors
+        cluster.unassign(td2.getId());
+        assignedRacks = cluster.getAssignedRacks(td2.getId());
+        assertEquals("After unassigning topology " + td2.getId() + ", racks for topology=" + td2.getId() + " is " + assignedRacks,
+                0, assignedRacks.size());
+        assignedRacks = cluster.getAssignedRacks(td1.getId());
+        assertEquals("After unassigning topology " + td2.getId() + ", racks for topology=" + td1.getId() + " is " + assignedRacks,
+                1, assignedRacks.size());
+        assertFalse("Topology " + td1.getId() + " should be fully assigned before freeing slots", cluster.needsSchedulingRas(td1));
+        freeSomeWorkerSlots(cluster);
+        assertTrue("Topology " + td1.getId() + " should need scheduling after freeing slots", cluster.needsSchedulingRas(td1));
+
+        // then reschedule executors
+        scheduler.schedule(topologies, cluster);
+
+        // only one rack should be in use by topology1
+        assignedRacks = cluster.getAssignedRacks(td1.getId());
+        assertEquals("After reassigning topology " + td2.getId() + ", racks for topology=" + td1.getId() + " is " + assignedRacks,
+                1, assignedRacks.size());
+    }
+
+    /**
+     * Assign and then clear out a rack to host list mapping in cluster.networkTopography.
+     * Expected behavior is that:
+     *  <li>the rack without hosts does not show up in {@link NodeSorterHostProximity#getSortedRacks()}</li>
+     *  <li>all the supervisor nodes still get returned in {@link NodeSorterHostProximity#sortAllNodes()} ()}</li>
+     *  <li>supervisors on cleared rack show up under {@link DNSToSwitchMapping#DEFAULT_RACK}</li>
+     *
+     *  <p>
+     *      Force an usual condition, where one of the racks is still passed to LazyNodeSortingIterator with
+     *      an empty list and then ensure that code is resilient.
+     *  </p>
+     */
+    @Test
+    void testWithImpairedClusterNetworkTopography() {
+        INimbus iNimbus = new INimbusTest();
+        double compPcore = 100;
+        double compOnHeap = 775;
+        double compOffHeap = 25;
+        int topo1NumSpouts = 1;
+        int topo1NumBolts = 5;
+        int topo1SpoutParallelism = 100;
+        int topo1BoltParallelism = 200;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 66;
+        long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + topo1NumBolts * topo1BoltParallelism + 10);
+        long compPerSuper =  compPerRack / numSupersPerRack;
+        double cpuPerSuper = compPcore * compPerSuper;
+        double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+        double topo1MaxHeapSize = memPerSuper;
+        final String topoName1 = "topology1";
+        int numRacks = 3;
+
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(numRacks, numSupersPerRack,  numPortsPerSuper,
+            0, 0, cpuPerSuper, memPerSuper, new HashMap<>());
+        TestDNSToSwitchMapping testDNSToSwitchMapping = new TestDNSToSwitchMapping(supMap.values());
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(compPcore, compOnHeap, compOffHeap, null, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+            topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, "user", topo1MaxHeapSize);
+
+        Topologies topologies = new Topologies(td1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        Map<String, List<String>> networkTopography = cluster.getNetworkTopography();
+        assertEquals("Expecting " + numRacks + " racks found " + networkTopography.size(), numRacks, networkTopography.size());
+        assertTrue("Expecting racks count to be >= 3, found " + networkTopography.size(), networkTopography.size() >= 3);
+
+        // Impair cluster.networkTopography and set one rack to have zero hosts, getSortedRacks should exclude this rack.
+        // Keep, the supervisorDetails unchanged - confirm that these nodes are not lost even with incomplete networkTopography
+        String rackIdToZero = networkTopography.keySet().stream().findFirst().get();
+        impairClusterRack(cluster, rackIdToZero, true, false);
+
+        NodeSorterHostProximity nodeSorterHostProximity = new NodeSorterHostProximity(cluster, td1);
+        nodeSorterHostProximity.getSortedRacks().forEach(x -> assertNotEquals(x.id, rackIdToZero));
+
+        // confirm that the above action has not lost the hosts and that they appear under the DEFAULT rack
+        {
+            Set<String> seenRacks = new HashSet<>();
+            nodeSorterHostProximity.getSortedRacks().forEach(x -> seenRacks.add(x.id));
+            assertEquals("Expecting rack cnt to be still " + numRacks, numRacks, seenRacks.size());
+            assertTrue("Expecting to see default-rack=" + DNSToSwitchMapping.DEFAULT_RACK + " in sortedRacks",
+                seenRacks.contains(DNSToSwitchMapping.DEFAULT_RACK));
+        }
+
+        // now check if node/supervisor is missing when sorting all nodes
+        Set<String> expectedNodes = supMap.keySet();
+        Set<String> seenNodes = new HashSet<>();
+        nodeSorterHostProximity.prepare(null);
+        nodeSorterHostProximity.sortAllNodes().forEach( n -> seenNodes.add(n));
+        assertEquals("Expecting see all supervisors ", expectedNodes, seenNodes);
+
+        // Now fully impair the cluster - confirm no default rack
+        {
+            cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+            cluster.setNetworkTopography(new TestDNSToSwitchMapping(supMap.values()).getRackToHosts());
+            impairClusterRack(cluster, rackIdToZero, true, true);
+            Set<String> seenRacks = new HashSet<>();
+            NodeSorterHostProximity nodeSorterHostProximity2 = new NodeSorterHostProximity(cluster, td1);
+            nodeSorterHostProximity2.getSortedRacks().forEach(x -> seenRacks.add(x.id));
+            Map<String, Set<String>> rackIdToHosts = nodeSorterHostProximity2.getRackIdToHosts();
+            String dumpOfRacks = rackIdToHosts.entrySet().stream()
+                .map(x -> String.format("rack %s -> hosts [%s]", x.getKey(), String.join(",", x.getValue())))
+                .collect(Collectors.joining("\n\t"));
+            assertEquals("Expecting rack cnt to be " + (numRacks - 1) + " but found " + seenRacks.size() + "\n\t" + dumpOfRacks,
+                numRacks - 1, seenRacks.size());
+            assertFalse("Found default-rack=" + DNSToSwitchMapping.DEFAULT_RACK + " in \n\t" + dumpOfRacks,
+                seenRacks.contains(DNSToSwitchMapping.DEFAULT_RACK));
+        }
+    }
+
+    /**
+     * Black list all nodes for a rack before sorting nodes.
+     * Confirm that {@link NodeSorterHostProximity#sortAllNodes()} still works.
+     *
+     */
+    @Test
+    void testWithBlackListedHosts() {
+        INimbus iNimbus = new INimbusTest();
+        double compPcore = 100;
+        double compOnHeap = 775;
+        double compOffHeap = 25;
+        int topo1NumSpouts = 1;
+        int topo1NumBolts = 5;
+        int topo1SpoutParallelism = 100;
+        int topo1BoltParallelism = 200;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 66;
+        long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + topo1NumBolts * topo1BoltParallelism + 10);
+        long compPerSuper =  compPerRack / numSupersPerRack;
+        double cpuPerSuper = compPcore * compPerSuper;
+        double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+        double topo1MaxHeapSize = memPerSuper;
+        final String topoName1 = "topology1";
+        int numRacks = 3;
+
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(numRacks, numSupersPerRack,  numPortsPerSuper,
+            0, 0, cpuPerSuper, memPerSuper, new HashMap<>());
+        TestDNSToSwitchMapping testDNSToSwitchMapping = new TestDNSToSwitchMapping(supMap.values());
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(compPcore, compOnHeap, compOffHeap, null, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+            topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, "user", topo1MaxHeapSize);
+
+        Topologies topologies = new Topologies(td1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        Map<String, List<String>> networkTopography = cluster.getNetworkTopography();
+        assertEquals("Expecting " + numRacks + " racks found " + networkTopography.size(), numRacks, networkTopography.size());
+        assertTrue("Expecting racks count to be >= 3, found " + networkTopography.size(), networkTopography.size() >= 3);
+
+        Set<String> blackListedHosts = new HashSet<>();
+        List<SupervisorDetails> supArray = new ArrayList<>(supMap.values());
+        for (int i = 0 ; i < numSupersPerRack ; i++) {
+            blackListedHosts.add(supArray.get(i).getHost());
+        }
+        blacklistHostsAndSortNodes(blackListedHosts, supMap.values(), cluster, td1);
+
+        String rackToClear = cluster.getNetworkTopography().keySet().stream().findFirst().get();
+        blackListedHosts = new HashSet<>(cluster.getNetworkTopography().get(rackToClear));
+        blacklistHostsAndSortNodes(blackListedHosts, supMap.values(), cluster, td1);
+    }
+
+    // Impair cluster by blacklisting some hosts
+    private void blacklistHostsAndSortNodes(
+        Set<String> blackListedHosts, Collection<SupervisorDetails> sups, Cluster cluster, TopologyDetails td1) {
+        LOG.info("blackListedHosts={}", blackListedHosts);
+        cluster.setBlacklistedHosts(blackListedHosts);
+
+        NodeSorterHostProximity nodeSorterHostProximity = new NodeSorterHostProximity(cluster, td1);
+        // confirm that the above action loses hosts
+        {
+            Set<String> allHosts = sups.stream().map(x -> x.getHost()).collect(Collectors.toSet());
+            Set<String> seenRacks = new HashSet<>();
+            nodeSorterHostProximity.getSortedRacks().forEach(x -> seenRacks.add(x.id));
+            Set<String> seenHosts = new HashSet<>();
+            nodeSorterHostProximity.getRackIdToHosts().forEach((k,v) -> seenHosts.addAll(v));
+            allHosts.removeAll(seenHosts);
+            assertEquals("Expecting only blacklisted hosts removed", allHosts, blackListedHosts);
+        }
+
+        // now check if sortAllNodes still works
+        Set<String> expectedNodes = sups.stream()
+            .filter(x -> !blackListedHosts.contains(x.getHost()))
+            .map(x ->x.getId())
+            .collect(Collectors.toSet());
+        Set<String> seenNodes = new HashSet<>();
+            nodeSorterHostProximity.prepare(null);
+            nodeSorterHostProximity.sortAllNodes().forEach( n -> seenNodes.add(n));
+        assertEquals("Expecting see all supervisors ", expectedNodes, seenNodes);
+    }
+
+    /**
+     * Impair the cluster for a specified rackId.
+     *  <li>making the host list a zero length</li>
+     *  <li>removing supervisors for the hosts on the rack</li>
+     *
+     * @param cluster cluster to impair
+     * @param rackId rackId to clear
+     * @param clearNetworkTopography if true, then clear (but not remove) the hosts in list for the rack.
+     * @param clearSupervisorMap if true, then remove supervisors for the rack.
+     */
+    private void impairClusterRack(Cluster cluster, String rackId, boolean clearNetworkTopography, boolean clearSupervisorMap) {
+        Set<String> hostIds = new HashSet<>(cluster.getNetworkTopography().computeIfAbsent(rackId, k -> new ArrayList<>()));
+        if (clearNetworkTopography) {
+            cluster.getNetworkTopography().computeIfAbsent(rackId, k -> new ArrayList<>()).clear();
+        }
+        if (clearSupervisorMap) {
+            Set<String> supToRemove = new HashSet<>();
+            for (String hostId: hostIds) {
+                cluster.getSupervisorsByHost(hostId).forEach(s -> supToRemove.add(s.getId()));
+            }
+            Map<String, SupervisorDetails> supervisorDetailsMap = cluster.getSupervisors();
+            for (String supId: supToRemove) {
+                supervisorDetailsMap.remove(supId);
+            }
+        }
+    }
+}
\ No newline at end of file