[STORM-3739] Scheduling should sort numa zones by host groups (#3379)
* [STORM-3739] Scheduling should sort numa zones by host groups
* [YSTORM-3739] Ignore blacklisted hosts and corresponding nodes/supervisors.
* [STORM-3739] Javadoc fix.
* [STORM-3739] Add check for null hostname, remove unused code, set totalResources.
* [STORM-3739] Remove unused methods.
* [STORM-3739] Sort hosts comparing average resources before comparing min resources.
* [STORM-3739] Add dead node check.
* [YSTORM-3739] NodeSortType changes and revert.
* [STORM-3739] Removed unused/commented code.
Co-authored-by: Bipin Prasad <bprasad@verizonmedia.com>
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index c584351..cc3561e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -19,12 +19,15 @@
package org.apache.storm.scheduler;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
@@ -283,6 +286,18 @@
Map<String, List<String>> getNetworkTopography();
/**
+ * Get host -> rack map - the inverse of networkTopography.
+ */
+ default Map<String, String> getHostToRack() {
+ Map<String, String> ret = new HashMap<>();
+ Map<String, List<String>> networkTopography = getNetworkTopography();
+ if (networkTopography != null) {
+ networkTopography.forEach((rack, hosts) -> hosts.forEach(host -> ret.put(host, rack)));
+ }
+ return ret;
+ }
+
+ /**
* Get all topology scheduler statuses.
*/
Map<String, String> getStatusMap();
@@ -339,4 +354,31 @@
* Get the nimbus configuration.
*/
Map<String, Object> getConf();
+
+ /**
+ * Determine the list of racks on which topologyIds have been assigned. Note that the returned set
+ * may contain {@link DNSToSwitchMapping#DEFAULT_RACK} if {@link #getHostToRack()} is null or
+ * does not contain the assigned host.
+ *
+ * @param topologyIds for which assignments are examined.
+ * @return set of racks on which assignments have been made.
+ */
+ default Set<String> getAssignedRacks(String... topologyIds) {
+ Set<String> ret = new HashSet<>();
+ Map<String, String> networkTopographyInverted = getHostToRack();
+ for (String topologyId: topologyIds) {
+ SchedulerAssignment assignment = getAssignmentById(topologyId);
+ if (assignment == null) {
+ continue;
+ }
+ for (WorkerSlot slot : assignment.getSlots()) {
+ String nodeId = slot.getNodeId();
+ SupervisorDetails supervisorDetails = getSupervisorById(nodeId);
+ String hostId = supervisorDetails.getHost();
+ String rackId = networkTopographyInverted.getOrDefault(hostId, DNSToSwitchMapping.DEFAULT_RACK);
+ ret.add(rackId);
+ }
+ }
+ return ret;
+ }
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
index 427fb56..645c4e0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
@@ -156,6 +156,18 @@
return hostnameToNodes;
}
+ /**
+ * Get a map from RasNodeId to HostName.
+ *
+ * @return map of nodeId to hostname
+ */
+ public Map<String, String> getNodeIdToHostname() {
+ Map<String, String> nodeIdToHostname = new HashMap<>();
+ nodeMap.values()
+ .forEach(node -> nodeIdToHostname.put(node.getId(), node.getHostname()));
+ return nodeIdToHostname;
+ }
+
@Override
public String toString() {
StringBuilder ret = new StringBuilder();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index ea77b1a..de097a6 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -247,10 +247,16 @@
return null;
}
+ private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb, String info) {
+ throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+ + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f', additionalInfo: '%s'",
+ used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb, info));
+ }
+
private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
- + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
- used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
+ + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+ used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
}
/**
@@ -375,7 +381,8 @@
return 0;
}
if (used.otherResources[i] > otherResources[i]) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+ String info = String.format("%s, %f > %f", getResourceNameForResourceIndex(i), used.otherResources[i], otherResources[i]);
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb, info);
}
min = Math.min(min, used.otherResources[i] / otherResources[i]);
}
@@ -384,14 +391,16 @@
/**
* If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
- * be less likely to be selected.
+ * be less likely to be selected. If the resource is in the request, make that resource positive.
* @param request the requested resources.
*/
public void updateForRareResourceAffinity(NormalizedResources request) {
int length = Math.min(this.otherResources.length, request.otherResources.length);
for (int i = 0; i < length; i++) {
- if (request.getResourceAt(i) == 0.0) {
- this.otherResources[i] = -1 * this.otherResources[i];
+ if (request.getResourceAt(i) == 0.0 && this.otherResources[i] > 0.0) {
+ this.otherResources[i] = -this.otherResources[i]; // make negative
+ } else if (request.getResourceAt(i) > 0.0 && this.otherResources[i] < 0.0) {
+ this.otherResources[i] = -this.otherResources[i]; // make positive
}
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 6f49083..4bc4d0b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -44,6 +44,7 @@
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
@@ -58,9 +59,25 @@
* Refer to {@link NodeSorter#NodeSorter(Cluster, TopologyDetails, NodeSortType)} for more details.
*/
public enum NodeSortType {
- GENERIC_RAS, // for deprecation, Used by GenericResourceAwareStrategyOld
- DEFAULT_RAS, // for deprecation, Used by DefaultResourceAwareStrategyOld
- COMMON // new and only node sorting type going forward
+ /**
+ * Generic Resource Aware Strategy sorting type.
+ * @deprecated used by GenericResourceAwareStrategyOld only. Use {link #COMMON} instead.
+ */
+ @Deprecated
+ GENERIC_RAS,
+
+ /**
+ * Default Resource Aware Strategy sorting type.
+ * @deprecated used by DefaultResourceAwareStrategyOld only. Use {link #COMMON} instead.
+ */
+ @Deprecated
+ DEFAULT_RAS,
+
+ /**
+ * New and only node sorting type going forward.
+ * {@link NodeSorterHostProximity#NodeSorterHostProximity(Cluster, TopologyDetails)} for more details
+ */
+ COMMON,
}
// instance variables from class instantiation
@@ -149,7 +166,8 @@
List<ExecutorDetails> orderedExecutors = execSorter.sortExecutors(unassignedExecutors);
Iterable<String> sortedNodes = null;
if (!this.sortNodesForEachExecutor) {
- sortedNodes = nodeSorter.sortAllNodes(null);
+ nodeSorter.prepare(null);
+ sortedNodes = nodeSorter.sortAllNodes();
}
return scheduleExecutorsOnNodes(orderedExecutors, sortedNodes);
}
@@ -189,7 +207,7 @@
LOG.debug("The max state search that will be used by topology {} is {}", topologyDetails.getId(), maxStateSearch);
searcherState = createSearcherState();
- setNodeSorter(new NodeSorter(cluster, topologyDetails, nodeSortType));
+ setNodeSorter(new NodeSorterHostProximity(cluster, topologyDetails, nodeSortType));
setExecSorter(orderExecutorsByProximity
? new ExecSorterByProximity(topologyDetails)
: new ExecSorterByConnectionCount(topologyDetails));
@@ -413,7 +431,8 @@
for (int i = 0; i < maxExecCnt ; i++) {
progressIdxForExec[i] = -1;
}
- LOG.info("scheduleExecutorsOnNodes: will assign {} executors for topo {}", maxExecCnt, topoName);
+ LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo {}, sortNodesForEachExecutor={}",
+ maxExecCnt, topoName, sortNodesForEachExecutor);
OUTERMOST_LOOP:
for (int loopCnt = 0 ; true ; loopCnt++) {
@@ -450,7 +469,8 @@
String comp = execToComp.get(exec);
if (sortedNodesIter == null || (this.sortNodesForEachExecutor && searcherState.isExecCompDifferentFromPrior())) {
progressIdx = -1;
- sortedNodesIter = nodeSorter.sortAllNodes(exec);
+ nodeSorter.prepare(exec);
+ sortedNodesIter = nodeSorter.sortAllNodes();
}
for (String nodeId : sortedNodesIter) {
@@ -470,8 +490,8 @@
if (numBoundAckerAssigned == -1) {
// This only happens when trying to assign bound ackers to the worker slot and failed.
// Free the entire worker slot and put those bound ackers back to unassigned list
- LOG.debug("Failed to assign bound acker for exec: {} of topo: {} to worker: {}. Backtracking.",
- exec, topoName, workerSlot);
+ LOG.debug("Failed to assign bound acker for exec={}, comp={}, topo: {} to worker: {}. Backtracking.",
+ exec, comp, topoName, workerSlot);
searcherState.freeWorkerSlotWithBoundAckers(node, workerSlot);
continue;
}
@@ -481,9 +501,13 @@
// and this is not the first exec to this workerSlot.
// So just go to next workerSlot and don't free the worker.
if (numBoundAckerAssigned > 0) {
- LOG.debug("Failed to assign exec: {} of topo: {} with bound ackers to worker: {}. Backtracking.",
- exec, topoName, workerSlot);
+ LOG.debug("Failed to assign exec={}, comp={}, topo={} with bound ackers to worker: {}. Backtracking.",
+ exec, comp, topoName, workerSlot);
searcherState.freeWorkerSlotWithBoundAckers(node, workerSlot);
+ } else {
+ LOG.debug("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
+ exec, comp, topoName, workerSlot,
+ node.getId(), node.getAvailableCpuResources(), node.getAvailableMemoryResources());
}
continue;
}
@@ -506,8 +530,10 @@
searcherState = searcherState.nextExecutor();
nodeForExec[execIndex] = node;
workerSlotForExec[execIndex] = workerSlot;
- LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}, slot-ordinal={} at loopCnt={}, topo={}",
- execIndex, comp, nodeId, progressIdx, loopCnt, topoName);
+ LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}/cpu={}/mem={}, "
+ + "slot-ordinal={} at loopCnt={}, topo={}",
+ execIndex, comp, nodeId, node.getAvailableCpuResources(), node.getAvailableMemoryResources(),
+ progressIdx, loopCnt, topoName);
continue OUTERMOST_LOOP;
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
index 5532d78..7b4f7d2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
@@ -30,7 +30,7 @@
/**
* Amongst all {@link #availableResources}, this is the minimum ratio of resource to the total available in group.
- * Note that nodes are grouped into racks. And racks are grouped under the cluster.
+ * Note that nodes are grouped into hosts. Hosts into racks. And racks are grouped under the cluster.
*
* <p>
* An example of this calculation is in
@@ -43,7 +43,7 @@
/**
* Amongst all {@link #availableResources}, this is the average ratio of resource to the total available in group.
- * Note that nodes are grouped into racks. And racks are grouped under the cluster.
+ * Note that nodes are grouped into hosts, hosts into racks, and racks are grouped under the cluster.
*
* <p>
* An example of this calculation is in
@@ -73,6 +73,11 @@
this.avgResourcePercent = avgResourcePercent;
}
+ public void add(ObjectResourcesItem other) {
+ this.availableResources.add(other.availableResources);
+ this.totalResources.add(other.totalResources);
+ }
+
@Override
public String toString() {
return this.id;
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
index ceda82e..f2a4e8d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
@@ -18,14 +18,20 @@
package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
-import java.util.TreeSet;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
public interface INodeSorter {
- TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec);
+ /**
+ * Prepare for node sorting. This method must be called before {@link #getSortedRacks()} and {@link #sortAllNodes()}.
+ *
+ * @param exec optional, may be null.
+ */
+ void prepare(ExecutorDetails exec);
- Iterable<String> sortAllNodes(ExecutorDetails exec);
+ Iterable<ObjectResourcesItem> getSortedRacks();
+
+ Iterable<String> sortAllNodes();
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
index 85a9015..7ff4b05 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -28,7 +29,6 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -70,6 +70,9 @@
protected List<String> favoredNodeIds;
protected List<String> unFavoredNodeIds;
+ // Updated in prepare method
+ ExecutorDetails exec;
+
/**
* Initialize for the default implementation node sorting.
*
@@ -93,21 +96,18 @@
// from Cluster
networkTopography = cluster.getNetworkTopography();
- Map<String, String> hostToRack = new HashMap<>();
- for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
- String rackId = entry.getKey();
- for (String hostName: entry.getValue()) {
- hostToRack.put(hostName, rackId);
- }
- }
+ Map<String, String> hostToRack = cluster.getHostToRack();
RasNodes nodes = new RasNodes(cluster);
for (RasNode node: nodes.getNodes()) {
String superId = node.getId();
String hostName = node.getHostname();
+ if (!node.isAlive() || hostName == null) {
+ continue;
+ }
String rackId = hostToRack.getOrDefault(hostName, DNSToSwitchMapping.DEFAULT_RACK);
superIdToRack.put(superId, rackId);
hostnameToNodes.computeIfAbsent(hostName, (hn) -> new ArrayList<>()).add(node);
- rackIdToNodes.computeIfAbsent(rackId, (hn) -> new ArrayList<>()).add(node);
+ rackIdToNodes.computeIfAbsent(rackId, (rid) -> new ArrayList<>()).add(node);
}
this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
@@ -122,16 +122,21 @@
unFavoredNodeIds.removeAll(favoredNodeIds);
}
+ @Override
+ public void prepare(ExecutorDetails exec) {
+ this.exec = exec;
+ }
+
/**
- * Scheduling uses {@link #sortAllNodes(ExecutorDetails)} which eventually
- * calls this method whose behavior can altered by setting {@link #nodeSortType}.
+ * Scheduling uses {@link #sortAllNodes()} which eventually
+ * calls this method whose behavior can be altered by setting {@link #nodeSortType}.
*
* @param resourcesSummary contains all individual {@link ObjectResourcesItem} as well as cumulative stats
* @param exec executor for which the sorting is done
* @param existingScheduleFunc a function to get existing executors already scheduled on this object
* @return a sorted list of {@link ObjectResourcesItem}
*/
- protected TreeSet<ObjectResourcesItem> sortObjectResources(
+ protected List<ObjectResourcesItem> sortObjectResources(
ObjectResourcesSummary resourcesSummary, ExecutorDetails exec, ExistingScheduleFunc existingScheduleFunc) {
switch (nodeSortType) {
case DEFAULT_RAS:
@@ -180,7 +185,7 @@
* @param existingScheduleFunc a function to get existing executors already scheduled on this object
* @return a sorted list of ObjectResources
*/
- private TreeSet<ObjectResourcesItem> sortObjectResourcesCommon(
+ private List<ObjectResourcesItem> sortObjectResourcesCommon(
final ObjectResourcesSummary allResources, final ExecutorDetails exec,
final ExistingScheduleFunc existingScheduleFunc) {
// Copy and modify allResources
@@ -203,34 +208,31 @@
);
// Use the following comparator to return a sorted set
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- double o1Avg = o1.avgResourcePercent;
- double o2Avg = o2.avgResourcePercent;
-
- if (o1Avg > o2Avg) {
- return -1;
- } else if (o1Avg < o2Avg) {
- return 1;
- } else {
- if (o1.minResourcePercent > o2.minResourcePercent) {
- return -1;
- } else if (o1.minResourcePercent < o2.minResourcePercent) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- }
- });
+ List<ObjectResourcesItem> sortedObjectResources = new ArrayList();
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg = o1.avgResourcePercent;
+ double o2Avg = o2.avgResourcePercent;
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ sortedObjectResources.sort(comparator);
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
}
@@ -258,38 +260,35 @@
* @return a sorted list of ObjectResources
*/
@Deprecated
- private TreeSet<ObjectResourcesItem> sortObjectResourcesGeneric(
+ private List<ObjectResourcesItem> sortObjectResourcesGeneric(
final ObjectResourcesSummary allResources, ExecutorDetails exec,
final ExistingScheduleFunc existingScheduleFunc) {
ObjectResourcesSummary affinityBasedAllResources = new ObjectResourcesSummary(allResources);
NormalizedResourceRequest requestedResources = topologyDetails.getTotalResources(exec);
- for (ObjectResourcesItem objectResources : affinityBasedAllResources.getObjectResources()) {
- objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
- }
+ affinityBasedAllResources.getObjectResources()
+ .forEach(x -> x.availableResources.updateForRareResourceAffinity(requestedResources));
final NormalizedResourceOffer availableResourcesOverall = allResources.getAvailableResourcesOverall();
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- double o1Avg = availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
- double o2Avg = availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
-
- if (o1Avg > o2Avg) {
- return -1;
- } else if (o1Avg < o2Avg) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- });
+ List<ObjectResourcesItem> sortedObjectResources = new ArrayList<>();
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg = availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+ double o2Avg = availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ sortedObjectResources.sort(comparator);
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
}
@@ -313,7 +312,7 @@
* @return a sorted list of ObjectResources
*/
@Deprecated
- private TreeSet<ObjectResourcesItem> sortObjectResourcesDefault(
+ private List<ObjectResourcesItem> sortObjectResourcesDefault(
final ObjectResourcesSummary allResources,
final ExistingScheduleFunc existingScheduleFunc) {
@@ -328,32 +327,30 @@
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
}
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- if (o1.minResourcePercent > o2.minResourcePercent) {
- return -1;
- } else if (o1.minResourcePercent < o2.minResourcePercent) {
- return 1;
- } else {
- double diff = o1.avgResourcePercent - o2.avgResourcePercent;
- if (diff > 0.0) {
- return -1;
- } else if (diff < 0.0) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- }
- });
+ List<ObjectResourcesItem> sortedObjectResources = new ArrayList<>();
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+ if (diff > 0.0) {
+ return -1;
+ } else if (diff < 0.0) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
sortedObjectResources.addAll(allResources.getObjectResources());
+ sortedObjectResources.sort(comparator);
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
}
@@ -375,7 +372,7 @@
* @param rackId the rack id availNodes are a part of
* @return a sorted list of nodes.
*/
- private TreeSet<ObjectResourcesItem> sortNodes(
+ private List<ObjectResourcesItem> sortNodes(
List<RasNode> availRasNodes, ExecutorDetails exec, String rackId,
Map<String, AtomicInteger> scheduledCount) {
ObjectResourcesSummary rackResourcesSummary = new ObjectResourcesSummary("RACK");
@@ -428,7 +425,7 @@
private final Iterator<String> post;
private final Set<String> skip;
- LazyNodeSortingIterator(LazyNodeSorting parent, TreeSet<ObjectResourcesItem> sortedRacks) {
+ LazyNodeSortingIterator(LazyNodeSorting parent, List<ObjectResourcesItem> sortedRacks) {
this.parent = parent;
rackIterator = sortedRacks.iterator();
pre = favoredNodeIds.iterator();
@@ -495,8 +492,8 @@
private class LazyNodeSorting implements Iterable<String> {
private final Map<String, AtomicInteger> perNodeScheduledCount = new HashMap<>();
- private final TreeSet<ObjectResourcesItem> sortedRacks;
- private final Map<String, TreeSet<ObjectResourcesItem>> cachedNodes = new HashMap<>();
+ private final List<ObjectResourcesItem> sortedRacks;
+ private final Map<String, List<ObjectResourcesItem>> cachedNodes = new HashMap<>();
private final ExecutorDetails exec;
private final Set<String> skippedNodeIds = new HashSet<>();
@@ -516,10 +513,10 @@
.getAndAdd(entry.getValue().size());
}
}
- sortedRacks = sortRacks(exec);
+ sortedRacks = getSortedRacks();
}
- private TreeSet<ObjectResourcesItem> getSortedNodesFor(String rackId) {
+ private List<ObjectResourcesItem> getSortedNodesFor(String rackId) {
return cachedNodes.computeIfAbsent(rackId,
(rid) -> sortNodes(rackIdToNodes.getOrDefault(rid, Collections.emptyList()), exec, rid, perNodeScheduledCount));
}
@@ -531,7 +528,7 @@
}
@Override
- public Iterable<String> sortAllNodes(ExecutorDetails exec) {
+ public Iterable<String> sortAllNodes() {
return new LazyNodeSorting(exec);
}
@@ -590,8 +587,7 @@
*
* @return a sorted list of racks
*/
- @Override
- public TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec) {
+ public List<ObjectResourcesItem> getSortedRacks() {
final ObjectResourcesSummary clusterResourcesSummary = createClusterSummarizedResources();
final Map<String, AtomicInteger> scheduledCount = getScheduledExecCntByRackId();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
new file mode 100644
index 0000000..64f9f08
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeSorterHostProximity.class);
+
+ // instance variables from class instantiation
+ protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
+
+ protected Cluster cluster;
+ protected TopologyDetails topologyDetails;
+
+ // Instance variables derived from Cluster.
+ private final Map<String, String> superIdToRack = new HashMap<>();
+ private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
+ private final Map<String, String> nodeIdToHostname = new HashMap<>();
+ private final Map<String, Set<String>> rackIdToHosts = new HashMap<>();
+ protected List<String> greyListedSupervisorIds;
+
+ // Instance variables from Cluster and TopologyDetails.
+ protected List<String> favoredNodeIds;
+ protected List<String> unFavoredNodeIds;
+
+ // Updated in prepare method
+ ExecutorDetails exec;
+
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails topologyDetails) {
+ this(cluster, topologyDetails, BaseResourceAwareStrategy.NodeSortType.COMMON);
+ }
+
+ /**
+ * Initialize for the default implementation node sorting.
+ *
+ * <p>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting implemented in
+ * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting implemented in
+ * {@link #sortObjectResourcesDefault(ObjectResourcesSummary, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting implemented in
+ * {@link #sortObjectResourcesCommon(ObjectResourcesSummary, ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * </p>
+ *
+ * @param cluster for which nodes will be sorted.
+ * @param topologyDetails the topology to sort for.
+ * @param nodeSortType type of sorting to be applied to object resource collection {@link BaseResourceAwareStrategy.NodeSortType}.
+ */
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+ this.cluster = cluster;
+ this.topologyDetails = topologyDetails;
+ this.nodeSortType = nodeSortType;
+
+ // from Cluster
+ greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+ Map<String, String> hostToRack = cluster.getHostToRack();
+ RasNodes nodes = new RasNodes(cluster);
+ for (RasNode node: nodes.getNodes()) {
+ String superId = node.getId();
+ String hostName = node.getHostname();
+ if (!node.isAlive() || hostName == null) {
+ continue;
+ }
+ String rackId = hostToRack.getOrDefault(hostName, DNSToSwitchMapping.DEFAULT_RACK);
+ superIdToRack.put(superId, rackId);
+ hostnameToNodes.computeIfAbsent(hostName, (hn) -> new ArrayList<>()).add(node);
+ nodeIdToHostname.put(superId, hostName);
+ rackIdToHosts.computeIfAbsent(rackId, r -> new HashSet<>()).add(hostName);
+ }
+
+ // from TopologyDetails
+ Map<String, Object> topoConf = topologyDetails.getConf();
+
+ // From Cluster and TopologyDetails - and cleaned-up
+ favoredNodeIds = makeHostToNodeIds((List<String>) topoConf.get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+ unFavoredNodeIds = makeHostToNodeIds((List<String>) topoConf.get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+ favoredNodeIds.removeAll(greyListedSupervisorIds);
+ unFavoredNodeIds.removeAll(greyListedSupervisorIds);
+ unFavoredNodeIds.removeAll(favoredNodeIds);
+ }
+
+ @VisibleForTesting
+ public Map<String, Set<String>> getRackIdToHosts() {
+ return rackIdToHosts;
+ }
+
+ @Override
+ public void prepare(ExecutorDetails exec) {
+ this.exec = exec;
+ }
+
+ /**
+ * Scheduling uses {@link #sortAllNodes()} which eventually
+ * calls this method whose behavior can be altered by setting {@link #nodeSortType}.
+ *
+ * @param resourcesSummary contains all individual {@link ObjectResourcesItem} as well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ protected Iterable<ObjectResourcesItem> sortObjectResources(
+ ObjectResourcesSummary resourcesSummary, ExecutorDetails exec, ExistingScheduleFunc existingScheduleFunc) {
+ switch (nodeSortType) {
+ case DEFAULT_RAS:
+ return sortObjectResourcesDefault(resourcesSummary, existingScheduleFunc);
+ case GENERIC_RAS:
+ return sortObjectResourcesGeneric(resourcesSummary, exec, existingScheduleFunc);
+ case COMMON:
+ return sortObjectResourcesCommon(resourcesSummary, exec, existingScheduleFunc);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Sort objects by the following three criteria.
+ *
+ * <li>
+ * The number executors of the topology that needs to be scheduled is already on the object (node or rack)
+ * in descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on
+ * the same object (node or rack) as the existing executors of the topology.
+ * </li>
+ *
+ * <li>
+ * The subordinate/subservient resource availability percentage of a rack in descending order We calculate the
+ * resource availability percentage by dividing the resource availability of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if object references a node or a rack.
+ * How this differs from the DefaultResourceAwareStrategy is that the percentage boosts the node or rack if it is
+ * requested by the executor that the sorting is being done for and pulls it down if it is not.
+ * By doing this calculation, objects (node or rack) that have exhausted or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource availability and nodes or racks that have
+ * resources that are not requested will be ranked below . So we will be less likely to pick a rack that
+ * have a lot of one resource but a low amount of another and have a lot of resources that are not requested by the executor.
+ * This is similar to logic used {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, ExistingScheduleFunc)}.
+ * </li>
+ *
+ * <li>
+ * The tie between two nodes with same resource availability is broken by using the node with lower minimum
+ * percentage used. This comparison was used in {@link #sortObjectResourcesDefault(ObjectResourcesSummary, ExistingScheduleFunc)}
+ * but here it is made subservient to modified resource availbility used in
+ * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, ExistingScheduleFunc)}.
+ *
+ * </li>
+ *
+ * @param allResources contains all individual ObjectResources as well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ private Iterable<ObjectResourcesItem> sortObjectResourcesCommon(
+ final ObjectResourcesSummary allResources, final ExecutorDetails exec,
+ final ExistingScheduleFunc existingScheduleFunc) {
+ // Copy and modify allResources
+ ObjectResourcesSummary affinityBasedAllResources = new ObjectResourcesSummary(allResources);
+ final NormalizedResourceOffer availableResourcesOverall = allResources.getAvailableResourcesOverall();
+ final NormalizedResourceRequest requestedResources = (exec != null) ? topologyDetails.getTotalResources(exec) : null;
+ affinityBasedAllResources.getObjectResources().forEach(
+ x -> {
+ if (requestedResources != null) {
+ // negate unrequested resources
+ x.availableResources.updateForRareResourceAffinity(requestedResources);
+ }
+ x.minResourcePercent = availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+ x.avgResourcePercent = availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
+
+ LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={}, numExistingSchedule={}",
+ x.id, x.minResourcePercent, x.avgResourcePercent,
+ existingScheduleFunc.getNumExistingSchedule(x.id));
+ }
+ );
+
+ // Use the following comparator to sort
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg = o1.avgResourcePercent;
+ double o2Avg = o2.avgResourcePercent;
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new TreeSet(comparator);
+ sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Sort objects by the following two criteria.
+ *
+ * <li>the number executors of the topology that needs to be scheduled is already on the
+ * object (node or rack) in descending order. The reasoning to sort based on criterion 1 is so we schedule the rest
+ * of a topology on the same object (node or rack) as the existing executors of the topology.</li>
+ *
+ * <li>the subordinate/subservient resource availability percentage of a rack in descending order We calculate the
+ * resource availability percentage by dividing the resource availability of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if object references a node or a rack.
+ * How this differs from the DefaultResourceAwareStrategy is that the percentage boosts the node or rack if it is
+ * requested by the executor that the sorting is being done for and pulls it down if it is not.
+ * By doing this calculation, objects (node or rack) that have exhausted or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource availability and nodes or racks that have
+ * resources that are not requested will be ranked below . So we will be less likely to pick a rack that
+ * have a lot of one resource but a low amount of another and have a lot of resources that are not requested by the executor.</li>
+ *
+ * @param allResources contains all individual ObjectResources as well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ @Deprecated
+ private Iterable<ObjectResourcesItem> sortObjectResourcesGeneric(
+ final ObjectResourcesSummary allResources, ExecutorDetails exec,
+ final ExistingScheduleFunc existingScheduleFunc) {
+ ObjectResourcesSummary affinityBasedAllResources = new ObjectResourcesSummary(allResources);
+ final NormalizedResourceOffer availableResourcesOverall = allResources.getAvailableResourcesOverall();
+ final NormalizedResourceRequest requestedResources = (exec != null) ? topologyDetails.getTotalResources(exec) : null;
+ affinityBasedAllResources.getObjectResources().forEach(
+ x -> {
+ if (requestedResources != null) {
+ // negate unrequested resources
+ x.availableResources.updateForRareResourceAffinity(requestedResources);
+ }
+ x.minResourcePercent = availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+ x.avgResourcePercent = availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
+
+ LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={}, numExistingSchedule={}",
+ x.id, x.minResourcePercent, x.avgResourcePercent,
+ existingScheduleFunc.getNumExistingSchedule(x.id));
+ }
+ );
+
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg = o1.avgResourcePercent;
+ double o2Avg = o2.avgResourcePercent;
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new TreeSet<>(comparator);
+ sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Sort objects by the following two criteria.
+ *
+ * <li>the number executors of the topology that needs to be scheduled is already on the
+ * object (node or rack) in descending order. The reasoning to sort based on criterion 1 is so we schedule the rest
+ * of a topology on the same object (node or rack) as the existing executors of the topology.</li>
+ *
+ * <li>the subordinate/subservient resource availability percentage of a rack in descending order We calculate the
+ * resource availability percentage by dividing the resource availability of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if object references a node or a rack.
+ * By doing this calculation, objects (node or rack) that have exhausted or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource availability. So we will be less likely to pick
+ * a rack that have a lot of one resource but a low amount of another.</li>
+ *
+ * @param allResources contains all individual ObjectResources as well as cumulative stats
+ * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ @Deprecated
+ private Iterable<ObjectResourcesItem> sortObjectResourcesDefault(
+ final ObjectResourcesSummary allResources,
+ final ExistingScheduleFunc existingScheduleFunc) {
+
+ final NormalizedResourceOffer availableResourcesOverall = allResources.getAvailableResourcesOverall();
+ for (ObjectResourcesItem objectResources : allResources.getObjectResources()) {
+ objectResources.minResourcePercent =
+ availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
+ objectResources.avgResourcePercent =
+ availableResourcesOverall.calculateAveragePercentageUsedBy(objectResources.availableResources);
+ LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={}, numExistingSchedule={}",
+ objectResources.id, objectResources.minResourcePercent, objectResources.avgResourcePercent,
+ existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+ }
+
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+ if (diff > 0.0) {
+ return -1;
+ } else if (diff < 0.0) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new TreeSet<>(comparator);
+ sortedObjectResources.addAll(allResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Nodes are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled is already on the node in
+ * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same node as the
+ * existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a node in descending
+ * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of
+ * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this
+ * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of
+ * one resource but a low amount of another.
+ *
+ * @param availHosts a collection of all the hosts we want to sort
+ * @param rackId the rack id availNodes are a part of
+ * @return an iterable of sorted hosts.
+ */
+ private Iterable<ObjectResourcesItem> sortHosts(
+ Collection<String> availHosts, ExecutorDetails exec, String rackId,
+ Map<String, AtomicInteger> scheduledCount) {
+ ObjectResourcesSummary rackResourcesSummary = new ObjectResourcesSummary("RACK");
+ availHosts.forEach(h -> {
+ ObjectResourcesItem hostItem = new ObjectResourcesItem(h);
+ for (RasNode x : hostnameToNodes.get(h)) {
+ hostItem.add(new ObjectResourcesItem(x.getId(), x.getTotalAvailableResources(), x.getTotalResources(), 0, 0));
+ }
+ rackResourcesSummary.addObjectResourcesItem(hostItem);
+ });
+
+ LOG.debug(
+ "Rack {}: Overall Avail [ {} ] Total [ {} ]",
+ rackId,
+ rackResourcesSummary.getAvailableResourcesOverall(),
+ rackResourcesSummary.getTotalResourcesOverall());
+
+ return sortObjectResources(
+ rackResourcesSummary,
+ exec,
+ (hostId) -> {
+ AtomicInteger count = scheduledCount.get(hostId);
+ if (count == null) {
+ return 0;
+ }
+ return count.get();
+ });
+ }
+
+ /**
+ * Nodes are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled is already on the node in
+ * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same node as the
+ * existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a node in descending
+ * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of
+ * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this
+ * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of
+ * one resource but a low amount of another.
+ *
+ * @param availRasNodes a list of all the nodes we want to sort
+ * @param hostId the host-id that availNodes are a part of
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem} for nodes.
+ */
+ private Iterable<ObjectResourcesItem> sortNodes(
+ List<RasNode> availRasNodes, ExecutorDetails exec, String hostId,
+ Map<String, AtomicInteger> scheduledCount) {
+ ObjectResourcesSummary hostResourcesSummary = new ObjectResourcesSummary("HOST");
+ availRasNodes.forEach(x ->
+ hostResourcesSummary.addObjectResourcesItem(
+ new ObjectResourcesItem(x.getId(), x.getTotalAvailableResources(), x.getTotalResources(), 0, 0)
+ )
+ );
+
+ LOG.debug(
+ "Host {}: Overall Avail [ {} ] Total [ {} ]",
+ hostId,
+ hostResourcesSummary.getAvailableResourcesOverall(),
+ hostResourcesSummary.getTotalResourcesOverall());
+
+ return sortObjectResources(
+ hostResourcesSummary,
+ exec,
+ (superId) -> {
+ AtomicInteger count = scheduledCount.get(superId);
+ if (count == null) {
+ return 0;
+ }
+ return count.get();
+ });
+ }
+
+ protected List<String> makeHostToNodeIds(List<String> hosts) {
+ if (hosts == null) {
+ return Collections.emptyList();
+ }
+ List<String> ret = new ArrayList<>(hosts.size());
+ for (String host: hosts) {
+ List<RasNode> nodes = hostnameToNodes.get(host);
+ if (nodes != null) {
+ for (RasNode node : nodes) {
+ ret.add(node.getId());
+ }
+ }
+ }
+ return ret;
+ }
+
+ private class LazyNodeSortingIterator implements Iterator<String> {
+ private final LazyNodeSorting parent;
+ private final Iterator<ObjectResourcesItem> rackIterator;
+ private Iterator<ObjectResourcesItem> hostIterator;
+ private Iterator<ObjectResourcesItem> nodeIterator;
+ private String nextValueFromNode = null;
+ private final Iterator<String> pre;
+ private final Iterator<String> post;
+ private final Set<String> skip;
+
+ LazyNodeSortingIterator(LazyNodeSorting parent, Iterable<ObjectResourcesItem> sortedRacks) {
+ this.parent = parent;
+ rackIterator = sortedRacks.iterator();
+ pre = favoredNodeIds.iterator();
+ post = Stream.concat(unFavoredNodeIds.stream(), greyListedSupervisorIds.stream())
+ .collect(Collectors.toList())
+ .iterator();
+ skip = parent.skippedNodeIds;
+ }
+
+ private Iterator<ObjectResourcesItem> getNodeIterator() {
+ if (nodeIterator != null && nodeIterator.hasNext()) {
+ return nodeIterator;
+ }
+ //need to get the next host/node iterator
+ if (hostIterator != null && hostIterator.hasNext()) {
+ ObjectResourcesItem host = hostIterator.next();
+ final String hostId = host.id;
+ nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+ return nodeIterator;
+ }
+ if (rackIterator.hasNext()) {
+ ObjectResourcesItem rack = rackIterator.next();
+ final String rackId = rack.id;
+ hostIterator = parent.getSortedHostsForRack(rackId).iterator();
+ ObjectResourcesItem host = hostIterator.next();
+ final String hostId = host.id;
+ nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+ return nodeIterator;
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (pre.hasNext()) {
+ return true;
+ }
+ if (nextValueFromNode != null) {
+ return true;
+ }
+ while (true) {
+ //For the node we don't know if we have another one unless we look at the contents
+ Iterator<ObjectResourcesItem> nodeIterator = getNodeIterator();
+ if (nodeIterator == null || !nodeIterator.hasNext()) {
+ break;
+ }
+ String tmp = nodeIterator.next().id;
+ if (!skip.contains(tmp)) {
+ nextValueFromNode = tmp;
+ return true;
+ }
+ }
+ return post.hasNext();
+ }
+
+ @Override
+ public String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (pre.hasNext()) {
+ return pre.next();
+ }
+ if (nextValueFromNode != null) {
+ String tmp = nextValueFromNode;
+ nextValueFromNode = null;
+ return tmp;
+ }
+ return post.next();
+ }
+ }
+
+ private class LazyNodeSorting implements Iterable<String> {
+ private final Map<String, AtomicInteger> perHostScheduledCount = new HashMap<>();
+ private final Map<String, AtomicInteger> perNodeScheduledCount = new HashMap<>();
+ private final Iterable<ObjectResourcesItem> sortedRacks;
+ private final Map<String, Iterable<ObjectResourcesItem>> cachedHosts = new HashMap<>();
+ private final Map<String, Iterable<ObjectResourcesItem>> cachedNodesByHost = new HashMap<>();
+ private final ExecutorDetails exec;
+ private final Set<String> skippedNodeIds = new HashSet<>();
+
+ LazyNodeSorting(ExecutorDetails exec) {
+ this.exec = exec;
+ skippedNodeIds.addAll(favoredNodeIds);
+ skippedNodeIds.addAll(unFavoredNodeIds);
+ skippedNodeIds.addAll(greyListedSupervisorIds);
+
+ String topoId = topologyDetails.getId();
+ SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+ if (assignment != null) {
+ for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+ assignment.getSlotToExecutors().entrySet()) {
+ String superId = entry.getKey().getNodeId();
+ String hostId = nodeIdToHostname.get(superId);
+ perHostScheduledCount.computeIfAbsent(hostId, id -> new AtomicInteger(0))
+ .getAndAdd(entry.getValue().size());
+ perNodeScheduledCount.computeIfAbsent(superId, id -> new AtomicInteger(0))
+ .getAndAdd(entry.getValue().size());
+ }
+ }
+ sortedRacks = getSortedRacks();
+ }
+
+ private Iterable<ObjectResourcesItem> getSortedHostsForRack(String rackId) {
+ return cachedHosts.computeIfAbsent(rackId,
+ id -> sortHosts(rackIdToHosts.getOrDefault(id, Collections.emptySet()), exec, id, perHostScheduledCount));
+ }
+
+ private Iterable<ObjectResourcesItem> getSortedNodesForHost(String hostId) {
+ return cachedNodesByHost.computeIfAbsent(hostId,
+ id -> sortNodes(hostnameToNodes.getOrDefault(id, Collections.emptyList()), exec, id, perNodeScheduledCount));
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ return new LazyNodeSortingIterator(this, sortedRacks);
+ }
+ }
+
+ @Override
+ public Iterable<String> sortAllNodes() {
+ return new LazyNodeSorting(exec);
+ }
+
+ private ObjectResourcesSummary createClusterSummarizedResources() {
+ ObjectResourcesSummary clusterResourcesSummary = new ObjectResourcesSummary("Cluster");
+ rackIdToHosts.forEach((rackId, hostIds) -> {
+ if (hostIds == null || hostIds.isEmpty()) {
+ LOG.info("Ignoring Rack {} since it has no hosts", rackId);
+ } else {
+ ObjectResourcesItem rack = new ObjectResourcesItem(rackId);
+ for (String hostId : hostIds) {
+ for (RasNode node : hostnameToNodes(hostId)) {
+ rack.availableResources.add(node.getTotalAvailableResources());
+ rack.totalResources.add(node.getTotalResources());
+ }
+ }
+ clusterResourcesSummary.addObjectResourcesItem(rack);
+ }
+ });
+
+ LOG.debug(
+ "Cluster Overall Avail [ {} ] Total [ {} ], rackCnt={}, hostCnt={}",
+ clusterResourcesSummary.getAvailableResourcesOverall(),
+ clusterResourcesSummary.getTotalResourcesOverall(),
+ clusterResourcesSummary.getObjectResources().size(),
+ rackIdToHosts.values().stream().mapToInt(x -> x.size()).sum());
+ return clusterResourcesSummary;
+ }
+
+ public Map<String, AtomicInteger> getScheduledExecCntByRackId() {
+ SchedulerAssignment assignment = cluster.getAssignmentById(topologyDetails.getId());
+ Map<String, AtomicInteger> scheduledCount = new HashMap<>();
+ if (assignment != null) {
+ for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+ assignment.getSlotToExecutors().entrySet()) {
+ String superId = entry.getKey().getNodeId();
+ String rackId = superIdToRack.get(superId);
+ scheduledCount.computeIfAbsent(rackId, (rid) -> new AtomicInteger(0))
+ .getAndAdd(entry.getValue().size());
+ }
+ }
+ return scheduledCount;
+ }
+
+ /**
+ * Racks are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled is already on the rack in descending order.
+ * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the existing executors of the
+ * topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a rack in descending order We calculate
+ * the resource availability percentage by dividing the resource availability on the rack by the resource availability of the entire
+ * cluster By doing this calculation, racks that have exhausted or little of one of the resources mentioned above will be ranked after
+ * racks that have more balanced resource availability. So we will be less likely to pick a rack that have a lot of one resource but a
+ * low amount of another.
+ *
+ * @return an iterable of sorted racks
+ */
+ public Iterable<ObjectResourcesItem> getSortedRacks() {
+
+ final ObjectResourcesSummary clusterResourcesSummary = createClusterSummarizedResources();
+ final Map<String, AtomicInteger> scheduledCount = getScheduledExecCntByRackId();
+
+ return sortObjectResources(
+ clusterResourcesSummary,
+ exec,
+ (rackId) -> {
+ AtomicInteger count = scheduledCount.get(rackId);
+ if (count == null) {
+ return 0;
+ }
+ return count.get();
+ });
+ }
+
+ /**
+ * hostname to Ids.
+ *
+ * @param hostname the hostname.
+ * @return the ids n that node.
+ */
+ public List<RasNode> hostnameToNodes(String hostname) {
+ return hostnameToNodes.getOrDefault(hostname, Collections.emptyList());
+ }
+
+ /**
+ * interface for calculating the number of existing executors scheduled on a object (rack or node).
+ */
+ public interface ExistingScheduleFunc {
+ int getNumExistingSchedule(String objectId);
+ }
+}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 11de468..dcffa99 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -189,13 +189,34 @@
}
}
- public static Map<String, SupervisorDetails> genSupervisorsWithRacks(int numRacks, int numSupersPerRack, int numPorts, int rackStart,
- int superInRackStart, double cpu, double mem,
- Map<String, Double> miscResources) {
- Map<String, Double> resourceMap = new HashMap<>();
- resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
- resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
- resourceMap.putAll(miscResources);
+ public static Map<String, SupervisorDetails> genSupervisorsWithRacks(
+ int numRacks, int numSupersPerRack, int numPorts, int rackStart, int superInRackStart,
+ double cpu, double mem, Map<String, Double> miscResources) {
+
+ return genSupervisorsWithRacksAndNuma(numRacks, numSupersPerRack, 1, numPorts, rackStart,
+ superInRackStart, cpu, mem, miscResources, 1.0);
+ }
+
+ /**
+ * Takes one additional parameter numaZonesPerHost. This parameter determines how many supervisors
+ * will be created on the same host. If numaResourceMultiplier is set to a factor below 1.0, then
+ * each subsequent numa zone will have corresponding lower cpu/mem than previous numa zone.
+ *
+ * @param numRacks
+ * @param numSupersPerRack
+ * @param numaZonesPerHost
+ * @param numPorts
+ * @param rackStart
+ * @param superInRackStart
+ * @param cpu
+ * @param mem
+ * @param miscResources
+ * @param numaResourceMultiplier - cpu/mem resource for each numaZone is multiplied by this factor to obtain uneven resources
+ * @return
+ */
+ public static Map<String, SupervisorDetails> genSupervisorsWithRacksAndNuma(
+ int numRacks, int numSupersPerRack, int numaZonesPerHost, int numPorts, int rackStart, int superInRackStart,
+ double cpu, double mem, Map<String, Double> miscResources, double numaResourceMultiplier) {
Map<String, SupervisorDetails> retList = new HashMap<>();
for (int rack = rackStart; rack < numRacks + rackStart; rack++) {
for (int superInRack = superInRackStart; superInRack < (numSupersPerRack + superInRackStart); superInRack++) {
@@ -203,8 +224,23 @@
for (int p = 0; p < numPorts; p++) {
ports.add(p);
}
- SupervisorDetails sup = new SupervisorDetails(String.format("r%03ds%03d", rack, superInRack),
- String.format("host-%03d-rack-%03d", superInRack, rack), null, ports,
+ String superId;
+ String host;
+ int numaZone = superInRack % numaZonesPerHost;
+ if (numaZonesPerHost > 1) {
+ // multiple supervisors per host
+ int hostInRack = superInRack / numaZonesPerHost;
+ superId = String.format("r%03ds%03dn%d", rack, superInRack, numaZone);
+ host = String.format("host-%03d-rack-%03d", hostInRack, rack);
+ } else {
+ superId = String.format("r%03ds%03d", rack, superInRack);
+ host = String.format("host-%03d-rack-%03d", superInRack, rack);
+ }
+ Map<String, Double> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu * Math.pow(numaResourceMultiplier, numaZone));
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem * Math.pow(numaResourceMultiplier, numaZone));
+ resourceMap.putAll(miscResources);
+ SupervisorDetails sup = new SupervisorDetails(superId, host, null, ports,
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
retList.put(sup.getId(), sup);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index 96a0f9a..fce3065 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -22,6 +22,9 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
@@ -36,6 +39,7 @@
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByConstraintSeverity;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONValue;
@@ -77,6 +81,12 @@
public TestConstraintSolverStrategy(boolean consolidatedConfigFlag) {
this.consolidatedConfigFlag = consolidatedConfigFlag;
+ List<Class> classesToDebug = Arrays.asList(TestConstraintSolverStrategy.class,
+ BaseResourceAwareStrategy.class, ResourceAwareScheduler.class,
+ NodeSorterHostProximity.class, Cluster.class
+ );
+ Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO
+ classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel));
LOG.info("Running tests with consolidatedConfigFlag={}", consolidatedConfigFlag);
}
@@ -180,15 +190,15 @@
return makeTestTopoConf(1);
}
- public TopologyDetails makeTopology(Map<String, Object> config, int boltParallel) {
+ public static TopologyDetails makeTopology(Map<String, Object> config, int boltParallel) {
return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0, "user");
}
- public Cluster makeCluster(Topologies topologies) {
+ public static Cluster makeCluster(Topologies topologies) {
return makeCluster(topologies, null);
}
- public Cluster makeCluster(Topologies topologies, Map<String, SupervisorDetails> supMap) {
+ public static Cluster makeCluster(Topologies topologies, Map<String, SupervisorDetails> supMap) {
if (supMap == null) {
supMap = genSupervisors(4, 2, 120, 1200);
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 1b1c035..b6f2a20 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -44,6 +44,7 @@
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
+import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
import org.apache.storm.topology.SharedOffHeapWithinNode;
import org.apache.storm.topology.SharedOffHeapWithinWorker;
import org.apache.storm.topology.SharedOnHeap;
@@ -74,7 +75,8 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
+
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
@@ -373,6 +375,15 @@
SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
long numNodes = assignment.getSlotToExecutors().keySet().stream().map(WorkerSlot::getNodeId).distinct().count();
+ String assignmentString = "Assignments:\n\t" + assignment.getSlotToExecutors().entrySet().stream()
+ .map(x -> String.format("Node=%s, components=%s",
+ x.getKey().getNodeId(),
+ x.getValue().stream()
+ .map(y -> topo.getComponentFromExecutor(y))
+ .collect(Collectors.joining(","))
+ )
+ )
+ .collect(Collectors.joining("\n\t"));
if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_NONE) {
// Everything should fit in a single slot
@@ -411,7 +422,7 @@
int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
assertThat(numAssignedWorkers, is(8));
assertThat(assignment.getSlots().size(), is(8));
- assertThat(numNodes, is(2L));
+ assertThat(assignmentString, numNodes, is(2L));
} else if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_ONE_COMPONENT) {
double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker + sharedOffHeapWithinNode;
@@ -766,22 +777,17 @@
for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
String hostName = entry.getKey();
String rack = entry.getValue();
- List<String> nodesForRack = rackToNodes.get(rack);
- if (nodesForRack == null) {
- nodesForRack = new ArrayList<>();
- rackToNodes.put(rack, nodesForRack);
- }
- nodesForRack.add(hostName);
+ rackToNodes.computeIfAbsent(rack, rid -> new ArrayList<>()).add(hostName);
}
cluster.setNetworkTopography(rackToNodes);
DefaultResourceAwareStrategyOld rs = new DefaultResourceAwareStrategyOld();
rs.prepareForScheduling(cluster, topo1);
- INodeSorter nodeSorter = new NodeSorter(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
- TreeSet<ObjectResourcesItem> sortedRacks = nodeSorter.sortRacks(null);
+ INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+ nodeSorter.prepare(null);
+ Iterable<ObjectResourcesItem> sortedRacks = nodeSorter.getSortedRacks();
- Assert.assertEquals("# of racks sorted", 6, sortedRacks.size());
Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
// Ranked first since rack-0 has the most balanced set of resources
Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id);
@@ -904,10 +910,10 @@
DefaultResourceAwareStrategyOld rs = new DefaultResourceAwareStrategyOld();
rs.prepareForScheduling(cluster, topo1);
- INodeSorter nodeSorter = new NodeSorter(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
- TreeSet<ObjectResourcesItem> sortedRacks= nodeSorter.sortRacks(null);
+ INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+ nodeSorter.prepare(null);
+ Iterable<ObjectResourcesItem> sortedRacks= nodeSorter.getSortedRacks();
- Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
// Ranked first since rack-0 has the most balanced set of resources
Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id);
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
new file mode 100644
index 0000000..84b839f
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
@@ -0,0 +1,1036 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNodes;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@ExtendWith({NormalizedResourcesExtension.class})
+public class TestNodeSorterHostProximity {
+ private static final Logger LOG = LoggerFactory.getLogger(TestNodeSorterHostProximity.class);
+ private static final int CURRENT_TIME = 1450418597;
+
+ protected Class getDefaultResourceAwareStrategyClass() {
+ return DefaultResourceAwareStrategy.class;
+ }
+
+ private Config createClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
+ Map<String, Map<String, Number>> pools) {
+ Config config = TestUtilsForResourceAwareScheduler.createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, getDefaultResourceAwareStrategyClass().getName());
+ return config;
+ }
+
+ private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
+ private final Map<String, String> hostToRackMap;
+ private final Map<String, List<String>> rackToHosts;
+
+ @SafeVarargs
+ public TestDNSToSwitchMapping(Map<String, SupervisorDetails>... racks) {
+ Set<String> seenHosts = new HashSet<>();
+ Map<String, String> hostToRackMap = new HashMap<>();
+ Map<String, List<String>> rackToHosts = new HashMap<>();
+ for (int rackNum = 0; rackNum < racks.length; rackNum++) {
+ String rack = String.format("rack-%03d", rackNum);
+ for (SupervisorDetails sup : racks[rackNum].values()) {
+ hostToRackMap.put(sup.getHost(), rack);
+ String host = sup.getHost();
+ if (!seenHosts.contains(host)) {
+ rackToHosts.computeIfAbsent(rack, rid -> new ArrayList<>()).add(host);
+ seenHosts.add(host);
+ }
+ }
+ }
+ this.hostToRackMap = Collections.unmodifiableMap(hostToRackMap);
+ this.rackToHosts = Collections.unmodifiableMap(rackToHosts);
+ }
+
+ /**
+ * Use the "rack-%03d" embedded in the name of the supervisor to determine the rack number.
+ *
+ * @param supervisorDetailsCollection
+ */
+ public TestDNSToSwitchMapping(Collection<SupervisorDetails> supervisorDetailsCollection) {
+ Set<String> seenHosts = new HashSet<>();
+ Map<String, String> hostToRackMap = new HashMap<>();
+ Map<String, List<String>> rackToHosts = new HashMap<>();
+
+ for (SupervisorDetails supervisorDetails: supervisorDetailsCollection) {
+ String rackId = supervisorIdToRackName(supervisorDetails.getId());
+ hostToRackMap.put(supervisorDetails.getHost(), rackId);
+ String host = supervisorDetails.getHost();
+ if (!seenHosts.contains(host)) {
+ rackToHosts.computeIfAbsent(rackId, rid -> new ArrayList<>()).add(host);
+ seenHosts.add(host);
+ }
+ }
+ this.hostToRackMap = Collections.unmodifiableMap(hostToRackMap);
+ this.rackToHosts = Collections.unmodifiableMap(rackToHosts);
+ }
+
+ @Override
+ public Map<String, String> resolve(List<String> names) {
+ return hostToRackMap;
+ }
+
+ public Map<String, List<String>> getRackToHosts() {
+ return rackToHosts;
+ }
+ }
+
+ /**
+ * Test whether strategy will choose correct rack.
+ */
+ @Test
+ public void testMultipleRacks() {
+ final Map<String, SupervisorDetails> supMap = new HashMap<>();
+ final int numRacks = 1;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 4;
+ final int numZonesPerHost = 1;
+ final double numaResourceMultiplier = 1.0;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+
+ final Map<String, SupervisorDetails> supMapRack0 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 400, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate another rack of supervisors with less resources
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack1 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 200, 4000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate some supervisors that are depleted of one resource
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack2 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 0, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate some that has a lot of memory but little of cpu
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack3 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 10, 8000 * 2 + 4000, Collections.emptyMap(),numaResourceMultiplier);
+
+ //generate some that has a lot of cpu but little of memory
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack4 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 400 + 200 + 10, 1000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //Generate some that have neither resource, to verify that the strategy will prioritize this last
+ //Also put a generic resource with 0 value in the resources list, to verify that it doesn't affect the sorting
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack5 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 0.0, 0.0, Collections.singletonMap("gpu.count", 0.0), numaResourceMultiplier);
+
+ supMap.putAll(supMapRack0);
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+ supMap.putAll(supMapRack5);
+
+ Config config = createClusterConfig(100, 500, 500, null);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new INimbusTest();
+
+ //create test DNSToSwitchMapping plugin
+ TestDNSToSwitchMapping testDNSToSwitchMapping =
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4, supMapRack5);
+
+ //generate topologies
+ TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+ TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+ Topologies topologies = new Topologies(topo1, topo2);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ List<String> supHostnames = new LinkedList<>();
+ for (SupervisorDetails sup : supMap.values()) {
+ supHostnames.add(sup.getHost());
+ }
+ Map<String, List<String>> rackToHosts = testDNSToSwitchMapping.getRackToHosts();
+ cluster.setNetworkTopography(rackToHosts);
+
+ NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+ nodeSorter.prepare(null);
+ List<ObjectResourcesItem> sortedRacks = StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+ .collect(Collectors.toList());
+ String rackSummaries = sortedRacks.stream()
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+ Assert.assertEquals(rackSummaries + "\n# of racks sorted", 6, sortedRacks.size());
+ Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+ Assert.assertEquals(rackSummaries + "\nrack-000 should be ordered first since it has the most balanced set of resources", "rack-000", it.next().id);
+ Assert.assertEquals(rackSummaries + "\nrack-001 should be ordered second since it has a balanced set of resources but less than rack-000", "rack-001", it.next().id);
+ Assert.assertEquals(rackSummaries + "\nrack-004 should be ordered third since it has a lot of cpu but not a lot of memory", "rack-004", it.next().id);
+ Assert.assertEquals(rackSummaries + "\nrack-003 should be ordered fourth since it has a lot of memory but not cpu", "rack-003", it.next().id);
+ Assert.assertEquals(rackSummaries + "\nrack-002 should be ordered fifth since it has not cpu resources", "rack-002", it.next().id);
+ Assert.assertEquals(rackSummaries + "\nRack-005 should be ordered sixth since it has neither CPU nor memory available", "rack-005", it.next().id);
+ }
+
+ /**
+ * Test whether strategy will choose correct rack.
+ */
+ @Test
+ public void testMultipleRacksWithFavoritism() {
+ final Map<String, SupervisorDetails> supMap = new HashMap<>();
+ final int numRacks = 1;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 4;
+ final int numZonesPerHost = 2;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+ final Map<String, SupervisorDetails> supMapRack0 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 400, 8000, Collections.emptyMap(), 1.0);
+
+ //generate another rack of supervisors with less resources
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack1 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 200, 4000, Collections.emptyMap(), 1.0);
+
+ //generate some supervisors that are depleted of one resource
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack2 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 0, 8000, Collections.emptyMap(), 1.0);
+
+ //generate some that has a lot of memory but little of cpu
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack3 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 10, 8000 * 2 + 4000, Collections.emptyMap(), 1.0);
+
+ //generate some that has a lot of cpu but little of memory
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack4 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 400 + 200 + 10, 1000, Collections.emptyMap(), 1.0);
+
+ supMap.putAll(supMapRack0);
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+
+ Config config = createClusterConfig(100, 500, 500, null);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new INimbusTest();
+
+ //create test DNSToSwitchMapping plugin
+ TestDNSToSwitchMapping testDNSToSwitchMapping =
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4);
+
+ Config t1Conf = new Config();
+ t1Conf.putAll(config);
+ final List<String> t1FavoredHostNames = Arrays.asList("host-41", "host-42", "host-43");
+ t1Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, t1FavoredHostNames);
+ final List<String> t1UnfavoredHostIds = Arrays.asList("host-1", "host-2", "host-3");
+ t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, t1UnfavoredHostIds);
+ //generate topologies
+ TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+
+ Config t2Conf = new Config();
+ t2Conf.putAll(config);
+ t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, Arrays.asList("host-31", "host-32", "host-33"));
+ t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, Arrays.asList("host-11", "host-12", "host-13"));
+ TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+ Topologies topologies = new Topologies(topo1, topo2);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ List<String> supHostnames = new LinkedList<>();
+ for (SupervisorDetails sup : supMap.values()) {
+ supHostnames.add(sup.getHost());
+ }
+ Map<String, List<String>> rackToHosts = testDNSToSwitchMapping.getRackToHosts();
+ cluster.setNetworkTopography(rackToHosts);
+
+ NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, topo1, BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+ nodeSorter.prepare(null);
+ List<ObjectResourcesItem> sortedRacks = StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+ .collect(Collectors.toList());
+ String rackSummaries = sortedRacks.stream()
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+
+ Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+ // Ranked first since rack-000 has the most balanced set of resources
+ Assert.assertEquals("rack-000 should be ordered first", "rack-000", it.next().id);
+ // Ranked second since rack-1 has a balanced set of resources but less than rack-0
+ Assert.assertEquals("rack-001 should be ordered second", "rack-001", it.next().id);
+ // Ranked third since rack-4 has a lot of cpu but not a lot of memory
+ Assert.assertEquals("rack-004 should be ordered third", "rack-004", it.next().id);
+ // Ranked fourth since rack-3 has alot of memory but not cpu
+ Assert.assertEquals("rack-003 should be ordered fourth", "rack-003", it.next().id);
+ //Ranked last since rack-2 has not cpu resources
+ Assert.assertEquals("rack-00s2 should be ordered fifth", "rack-002", it.next().id);
+ }
+
+ /**
+ * Test if hosts are presented together regardless of resource availability.
+ * Supervisors are created with multiple Numa zones in such a manner that resources on two numa zones on the same host
+ * differ widely in resource availability.
+ */
+ @Test
+ public void testMultipleRacksWithHostProximity() {
+ final Map<String, SupervisorDetails> supMap = new HashMap<>();
+ final int numRacks = 1;
+ final int numSupersPerRack = 12;
+ final int numPortsPerSuper = 4;
+ final int numZonesPerHost = 3;
+ final double numaResourceMultiplier = 0.4;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+
+ final Map<String, SupervisorDetails> supMapRack0 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 400, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate another rack of supervisors with less resources
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack1 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 200, 4000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate some supervisors that are depleted of one resource
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack2 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 0, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate some that has a lot of memory but little of cpu
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack3 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 10, 8000 * 2 + 4000, Collections.emptyMap(),numaResourceMultiplier);
+
+ //generate some that has a lot of cpu but little of memory
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack4 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 400 + 200 + 10, 1000, Collections.emptyMap(), numaResourceMultiplier);
+
+ supMap.putAll(supMapRack0);
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+
+ Config config = createClusterConfig(100, 500, 500, null);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new INimbusTest();
+
+ //create test DNSToSwitchMapping plugin
+ TestDNSToSwitchMapping testDNSToSwitchMapping =
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4);
+
+ Config t1Conf = new Config();
+ t1Conf.putAll(config);
+ final List<String> t1FavoredHostNames = Arrays.asList("host-41", "host-42", "host-43");
+ t1Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, t1FavoredHostNames);
+ final List<String> t1UnfavoredHostIds = Arrays.asList("host-1", "host-2", "host-3");
+ t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, t1UnfavoredHostIds);
+ //generate topologies
+ TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+
+ Config t2Conf = new Config();
+ t2Conf.putAll(config);
+ t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, Arrays.asList("host-31", "host-32", "host-33"));
+ t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, Arrays.asList("host-11", "host-12", "host-13"));
+ TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+ Topologies topologies = new Topologies(topo1, topo2);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1);
+ nodeSorter.prepare(null);
+
+ Set<String> seenHosts = new HashSet<>();
+ String prevHost = null;
+ List<String> errLines = new ArrayList();
+ Map<String, String> nodeToHost = new RasNodes(cluster).getNodeIdToHostname();
+ for (String nodeId: nodeSorter.sortAllNodes()) {
+ String host = nodeToHost.getOrDefault(nodeId, "no-host-for-node-" + nodeId);
+ errLines.add(String.format("\tnodeId:%s, host:%s", nodeId, host));
+ if (!host.equals(prevHost) && seenHosts.contains(host)) {
+ String err = String.format("Host %s for node %s is out of order:\n\t%s", host, nodeId, String.join("\n\t", errLines));
+ Assert.fail(err);
+ }
+ seenHosts.add(host);
+ prevHost = host;
+ }
+ }
+
+ /**
+ * Racks should be returned in order of decreasing capacity.
+ */
+ @Test
+ public void testMultipleRacksOrderedByCapacity() {
+ final Map<String, SupervisorDetails> supMap = new HashMap<>();
+ final int numRacks = 1;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 4;
+ final int numZonesPerHost = 1;
+ final double numaResourceMultiplier = 1.0;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+
+ final Map<String, SupervisorDetails> supMapRack0 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 600, 8000 - rackStartNum, Collections.emptyMap(), numaResourceMultiplier);
+
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack1 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 500, 8000 - rackStartNum, Collections.emptyMap(), numaResourceMultiplier);
+
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack2 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 400, 8000 - rackStartNum, Collections.emptyMap(), numaResourceMultiplier);
+
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack3 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 300, 8000 - rackStartNum, Collections.emptyMap(),numaResourceMultiplier);
+
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack4 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 200, 8000 - rackStartNum, Collections.emptyMap(), numaResourceMultiplier);
+
+ // too small to hold topology
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack5 = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum++, supStartNum,
+ 100, 8000 - rackStartNum, Collections.singletonMap("gpu.count", 0.0), numaResourceMultiplier);
+
+ supMap.putAll(supMapRack0);
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+ supMap.putAll(supMapRack5);
+
+ Config config = createClusterConfig(100, 500, 500, null);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new INimbusTest();
+
+ //create test DNSToSwitchMapping plugin
+ TestDNSToSwitchMapping testDNSToSwitchMapping =
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4, supMapRack5);
+
+ //generate topologies
+ TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+ TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+
+ Topologies topologies = new Topologies(topo1, topo2);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, topo1);
+ nodeSorter.prepare(null);
+ List<ObjectResourcesItem> sortedRacks = StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+ .collect(Collectors.toList());
+ String rackSummaries = sortedRacks
+ .stream()
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+ NormalizedResourceRequest topoResourceRequest = topo1.getApproximateTotalResources();
+ String topoRequest = String.format("Topo %s, approx-requested-resources %s", topo1.getId(), topoResourceRequest.toString());
+ Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nRack-000 should be ordered first since it has the largest capacity", "rack-000", it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-001 should be ordered second since it smaller than rack-000", "rack-001", it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-002 should be ordered third since it is smaller than rack-001", "rack-002", it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-003 should be ordered fourth since it since it is smaller than rack-002", "rack-003", it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-004 should be ordered fifth since it since it is smaller than rack-003", "rack-004", it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-005 should be ordered last since it since it is has smallest capacity", "rack-005", it.next().id);
+ }
+
+ /**
+ * Schedule two topologies, once with special resources and another without.
+ * There are enough special resources to hold one topology with special resource ("my.gpu").
+ * If the sort order is incorrect, scheduling will not succeed.
+ */
+ @Test
+ public void testAntiAffinityWithMultipleTopologies() {
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(1, 40, 66, 0, 0, 4700, 226200, new HashMap<>());
+ HashMap<String, Double> extraResources = new HashMap<>();
+ extraResources.put("my.gpu", 1.0);
+ supMap.putAll(genSupervisorsWithRacks(1, 40, 66, 1, 0, 4700, 226200, extraResources));
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(88, 775, 25, null, null));
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails tdSimple = genTopology("topology-simple", config, 1,
+ 5, 100, 300, 0, 0, "user", 8192);
+
+ //Schedule the simple topology first
+ Topologies topologies = new Topologies(tdSimple);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ {
+ NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, tdSimple);
+ for (ExecutorDetails exec : tdSimple.getExecutors()) {
+ nodeSorter.prepare(exec);
+ List<ObjectResourcesItem> sortedRacks = StreamSupport
+ .stream(nodeSorter.getSortedRacks().spliterator(), false)
+ .collect(Collectors.toList());
+ String rackSummaries = StreamSupport
+ .stream(sortedRacks.spliterator(), false)
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+ NormalizedResourceRequest topoResourceRequest = tdSimple.getApproximateTotalResources();
+ String topoRequest = String.format("Topo %s, approx-requested-resources %s", tdSimple.getId(), topoResourceRequest.toString());
+ Assert.assertEquals(rackSummaries + "\n# of racks sorted", 2, sortedRacks.size());
+ Assert.assertEquals(rackSummaries + "\nFirst rack sorted", "rack-000", sortedRacks.get(0).id);
+ Assert.assertEquals(rackSummaries + "\nSecond rack sorted", "rack-001", sortedRacks.get(1).id);
+ }
+ }
+
+ scheduler.schedule(topologies, cluster);
+
+ TopologyBuilder builder = topologyBuilder(1, 5, 100, 300);
+ builder.setBolt("gpu-bolt", new TestBolt(), 40)
+ .addResource("my.gpu", 1.0)
+ .shuffleGrouping("spout-0");
+ TopologyDetails tdGpu = topoToTopologyDetails("topology-gpu", config, builder.createTopology(), 0, 0,"user", 8192);
+
+ //Now schedule GPU but with the simple topology in place.
+ topologies = new Topologies(tdSimple, tdGpu);
+ cluster = new Cluster(cluster, topologies);
+ {
+ NodeSorterHostProximity nodeSorter = new NodeSorterHostProximity(cluster, tdGpu);
+ for (ExecutorDetails exec : tdGpu.getExecutors()) {
+ String comp = tdGpu.getComponentFromExecutor(exec);
+ nodeSorter.prepare(exec);
+ List<ObjectResourcesItem> sortedRacks = StreamSupport
+ .stream(nodeSorter.getSortedRacks().spliterator(), false).collect(Collectors.toList());
+ String rackSummaries = sortedRacks.stream()
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d, min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id, nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+ NormalizedResourceRequest topoResourceRequest = tdSimple.getApproximateTotalResources();
+ String topoRequest = String.format("Topo %s, approx-requested-resources %s", tdSimple.getId(), topoResourceRequest.toString());
+ Assert.assertEquals(rackSummaries + "\n# of racks sorted", 2, sortedRacks.size());
+ if (comp.equals("gpu-bolt")) {
+ Assert.assertEquals(rackSummaries + "\nFirst rack sorted for " + comp, "rack-001", sortedRacks.get(0).id);
+ Assert.assertEquals(rackSummaries + "\nSecond rack sorted for " + comp, "rack-000", sortedRacks.get(1).id);
+ } else {
+ Assert.assertEquals(rackSummaries + "\nFirst rack sorted for " + comp, "rack-000", sortedRacks.get(0).id);
+ Assert.assertEquals(rackSummaries + "\nSecond rack sorted for " + comp, "rack-001", sortedRacks.get(1).id);
+ }
+ }
+ }
+
+ scheduler.schedule(topologies, cluster);
+
+ Map<String, SchedulerAssignment> assignments = new TreeMap<>(cluster.getAssignments());
+ assertEquals(2, assignments.size());
+
+ Map<String, Map<String, AtomicLong>> topoPerRackCount = new HashMap<>();
+ for (Map.Entry<String, SchedulerAssignment> entry: assignments.entrySet()) {
+ SchedulerAssignment sa = entry.getValue();
+ Map<String, AtomicLong> slotsPerRack = new TreeMap<>();
+ for (WorkerSlot slot : sa.getSlots()) {
+ String nodeId = slot.getNodeId();
+ String rack = supervisorIdToRackName(nodeId);
+ slotsPerRack.computeIfAbsent(rack, (r) -> new AtomicLong(0)).incrementAndGet();
+ }
+ LOG.info("{} => {}", entry.getKey(), slotsPerRack);
+ topoPerRackCount.put(entry.getKey(), slotsPerRack);
+ }
+
+ Map<String, AtomicLong> simpleCount = topoPerRackCount.get("topology-simple-0");
+ assertNotNull(simpleCount);
+ //Because the simple topology was scheduled first we want to be sure that it didn't put anything on
+ // the GPU nodes.
+ assertEquals(1, simpleCount.size()); //Only 1 rack is in use
+ assertFalse(simpleCount.containsKey("r001")); //r001 is the second rack with GPUs
+ assertTrue(simpleCount.containsKey("r000")); //r000 is the first rack with no GPUs
+
+ //We don't really care too much about the scheduling of topology-gpu-0, because it was scheduled.
+ }
+
+ /**
+ * Free one-fifth of WorkerSlots.
+ */
+ private void freeSomeWorkerSlots(Cluster cluster) {
+ Map<String, SchedulerAssignment> assignmentMap = cluster.getAssignments();
+ for (SchedulerAssignment schedulerAssignment: assignmentMap.values()) {
+ int i = 0;
+ List<WorkerSlot> slotsToKill = new ArrayList<>();
+ for (WorkerSlot workerSlot: schedulerAssignment.getSlots()) {
+ i++;
+ if (i % 5 == 0) {
+ slotsToKill.add(workerSlot);
+ }
+ }
+ cluster.freeSlots(slotsToKill);
+ }
+ }
+
+ /**
+ * If the topology is too large for one rack, it should be partially scheduled onto the next rack (and next rack only).
+ */
+ @Test
+ public void testFillUpRackAndSpilloverToNextRack() {
+ INimbus iNimbus = new INimbusTest();
+ double compPcore = 100;
+ double compOnHeap = 775;
+ double compOffHeap = 25;
+ int topo1NumSpouts = 1;
+ int topo1NumBolts = 5;
+ int topo1SpoutParallelism = 100;
+ int topo1BoltParallelism = 200;
+ final int numRacks = 3;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 6;
+ final int numZonesPerHost = 1;
+ final double numaResourceMultiplier = 1.0;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+ long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + topo1NumBolts * topo1BoltParallelism) * 4/5; // not enough for topo1
+ long compPerSuper = compPerRack / numSupersPerRack;
+ double cpuPerSuper = compPcore * compPerSuper;
+ double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+ double topo1MaxHeapSize = memPerSuper;
+ final String topoName1 = "topology1";
+
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum, supStartNum,
+ cpuPerSuper, memPerSuper, Collections.emptyMap(), numaResourceMultiplier);
+ TestDNSToSwitchMapping testDNSToSwitchMapping = new TestDNSToSwitchMapping(supMap.values());
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(compPcore, compOnHeap, compOffHeap, null, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+ topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, "user", topo1MaxHeapSize);
+
+ //Schedule the topo1 topology and ensure it fits on 2 racks
+ Topologies topologies = new Topologies(td1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ scheduler.schedule(topologies, cluster);
+ Set<String> assignedRacks = cluster.getAssignedRacks(td1.getId());
+ assertEquals("Racks for topology=" + td1.getId() + " is " + assignedRacks, 2, assignedRacks.size());
+ }
+
+ /**
+ * Rack with low resources should be used to schedule an executor if it has other executors for the same topology.
+ * <li>Schedule topo1 on one rack</li>
+ * <li>unassign some executors</li>
+ * <li>schedule another topology to partially fill up rack1</li>
+ * <li>Add another rack and schedule topology 1 remaining executors again</li>
+ * <li>scheduling should utilize all resources on rack1 before before trying next rack</li>
+ */
+ @Test
+ public void testPreferRackWithTopoExecutors() {
+ INimbus iNimbus = new INimbusTest();
+ double compPcore = 100;
+ double compOnHeap = 775;
+ double compOffHeap = 25;
+ int topo1NumSpouts = 1;
+ int topo1NumBolts = 5;
+ int topo1SpoutParallelism = 100;
+ int topo1BoltParallelism = 200;
+ int topo2NumSpouts = 1;
+ int topo2NumBolts = 5;
+ int topo2SpoutParallelism = 10;
+ int topo2BoltParallelism = 20;
+ final int numRacks = 3;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 6;
+ final int numZonesPerHost = 1;
+ final double numaResourceMultiplier = 1.0;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+ long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + topo1NumBolts * topo1BoltParallelism
+ + topo2NumSpouts * topo2SpoutParallelism); // enough for topo1 but not topo1+topo2
+ long compPerSuper = compPerRack / numSupersPerRack;
+ double cpuPerSuper = compPcore * compPerSuper;
+ double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+ double topo1MaxHeapSize = memPerSuper;
+ double topo2MaxHeapSize = memPerSuper;
+ final String topoName1 = "topology1";
+ final String topoName2 = "topology2";
+
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, rackStartNum, supStartNum,
+ cpuPerSuper, memPerSuper, Collections.emptyMap(), numaResourceMultiplier);
+ TestDNSToSwitchMapping testDNSToSwitchMapping = new TestDNSToSwitchMapping(supMap.values());
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(compPcore, compOnHeap, compOffHeap, null, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+ topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, "user", topo1MaxHeapSize);
+
+ //Schedule the topo1 topology and ensure it fits on 1 rack
+ Topologies topologies = new Topologies(td1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ scheduler.schedule(topologies, cluster);
+ Set<String> assignedRacks = cluster.getAssignedRacks(td1.getId());
+ assertEquals("Racks for topology=" + td1.getId() + " is " + assignedRacks, 1, assignedRacks.size());
+
+ TopologyBuilder builder = topologyBuilder(topo2NumSpouts, topo2NumBolts, topo2SpoutParallelism, topo2BoltParallelism);
+ TopologyDetails td2 = topoToTopologyDetails(topoName2, config, builder.createTopology(), 0, 0,"user", topo2MaxHeapSize);
+
+ //Now schedule GPU but with the simple topology in place.
+ topologies = new Topologies(td1, td2);
+ cluster = new Cluster(cluster, topologies);
+ scheduler.schedule(topologies, cluster);
+
+ assignedRacks = cluster.getAssignedRacks(td1.getId(), td2.getId());
+ assertEquals("Racks for topologies=" + td1.getId() + "/" + td2.getId() + " is " + assignedRacks, 2, assignedRacks.size());
+
+ // topo2 gets scheduled on its own rack because it is empty and available
+ assignedRacks = cluster.getAssignedRacks(td2.getId());
+ assertEquals("Racks for topologies=" + td2.getId() + " is " + assignedRacks, 1, assignedRacks.size());
+
+ // now unassign topo2, expect only one rack to be in use; free some slots and reschedule topo1 some topo1 executors
+ cluster.unassign(td2.getId());
+ assignedRacks = cluster.getAssignedRacks(td2.getId());
+ assertEquals("After unassigning topology " + td2.getId() + ", racks for topology=" + td2.getId() + " is " + assignedRacks,
+ 0, assignedRacks.size());
+ assignedRacks = cluster.getAssignedRacks(td1.getId());
+ assertEquals("After unassigning topology " + td2.getId() + ", racks for topology=" + td1.getId() + " is " + assignedRacks,
+ 1, assignedRacks.size());
+ assertFalse("Topology " + td1.getId() + " should be fully assigned before freeing slots", cluster.needsSchedulingRas(td1));
+ freeSomeWorkerSlots(cluster);
+ assertTrue("Topology " + td1.getId() + " should need scheduling after freeing slots", cluster.needsSchedulingRas(td1));
+
+ // then reschedule executors
+ scheduler.schedule(topologies, cluster);
+
+ // only one rack should be in use by topology1
+ assignedRacks = cluster.getAssignedRacks(td1.getId());
+ assertEquals("After reassigning topology " + td2.getId() + ", racks for topology=" + td1.getId() + " is " + assignedRacks,
+ 1, assignedRacks.size());
+ }
+
+ /**
+ * Assign and then clear out a rack to host list mapping in cluster.networkTopography.
+ * Expected behavior is that:
+ * <li>the rack without hosts does not show up in {@link NodeSorterHostProximity#getSortedRacks()}</li>
+ * <li>all the supervisor nodes still get returned in {@link NodeSorterHostProximity#sortAllNodes()} ()}</li>
+ * <li>supervisors on cleared rack show up under {@link DNSToSwitchMapping#DEFAULT_RACK}</li>
+ *
+ * <p>
+ * Force an usual condition, where one of the racks is still passed to LazyNodeSortingIterator with
+ * an empty list and then ensure that code is resilient.
+ * </p>
+ */
+ @Test
+ void testWithImpairedClusterNetworkTopography() {
+ INimbus iNimbus = new INimbusTest();
+ double compPcore = 100;
+ double compOnHeap = 775;
+ double compOffHeap = 25;
+ int topo1NumSpouts = 1;
+ int topo1NumBolts = 5;
+ int topo1SpoutParallelism = 100;
+ int topo1BoltParallelism = 200;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 66;
+ long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + topo1NumBolts * topo1BoltParallelism + 10);
+ long compPerSuper = compPerRack / numSupersPerRack;
+ double cpuPerSuper = compPcore * compPerSuper;
+ double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+ double topo1MaxHeapSize = memPerSuper;
+ final String topoName1 = "topology1";
+ int numRacks = 3;
+
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(numRacks, numSupersPerRack, numPortsPerSuper,
+ 0, 0, cpuPerSuper, memPerSuper, new HashMap<>());
+ TestDNSToSwitchMapping testDNSToSwitchMapping = new TestDNSToSwitchMapping(supMap.values());
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(compPcore, compOnHeap, compOffHeap, null, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+ topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, "user", topo1MaxHeapSize);
+
+ Topologies topologies = new Topologies(td1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ Map<String, List<String>> networkTopography = cluster.getNetworkTopography();
+ assertEquals("Expecting " + numRacks + " racks found " + networkTopography.size(), numRacks, networkTopography.size());
+ assertTrue("Expecting racks count to be >= 3, found " + networkTopography.size(), networkTopography.size() >= 3);
+
+ // Impair cluster.networkTopography and set one rack to have zero hosts, getSortedRacks should exclude this rack.
+ // Keep, the supervisorDetails unchanged - confirm that these nodes are not lost even with incomplete networkTopography
+ String rackIdToZero = networkTopography.keySet().stream().findFirst().get();
+ impairClusterRack(cluster, rackIdToZero, true, false);
+
+ NodeSorterHostProximity nodeSorterHostProximity = new NodeSorterHostProximity(cluster, td1);
+ nodeSorterHostProximity.getSortedRacks().forEach(x -> assertNotEquals(x.id, rackIdToZero));
+
+ // confirm that the above action has not lost the hosts and that they appear under the DEFAULT rack
+ {
+ Set<String> seenRacks = new HashSet<>();
+ nodeSorterHostProximity.getSortedRacks().forEach(x -> seenRacks.add(x.id));
+ assertEquals("Expecting rack cnt to be still " + numRacks, numRacks, seenRacks.size());
+ assertTrue("Expecting to see default-rack=" + DNSToSwitchMapping.DEFAULT_RACK + " in sortedRacks",
+ seenRacks.contains(DNSToSwitchMapping.DEFAULT_RACK));
+ }
+
+ // now check if node/supervisor is missing when sorting all nodes
+ Set<String> expectedNodes = supMap.keySet();
+ Set<String> seenNodes = new HashSet<>();
+ nodeSorterHostProximity.prepare(null);
+ nodeSorterHostProximity.sortAllNodes().forEach( n -> seenNodes.add(n));
+ assertEquals("Expecting see all supervisors ", expectedNodes, seenNodes);
+
+ // Now fully impair the cluster - confirm no default rack
+ {
+ cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(new TestDNSToSwitchMapping(supMap.values()).getRackToHosts());
+ impairClusterRack(cluster, rackIdToZero, true, true);
+ Set<String> seenRacks = new HashSet<>();
+ NodeSorterHostProximity nodeSorterHostProximity2 = new NodeSorterHostProximity(cluster, td1);
+ nodeSorterHostProximity2.getSortedRacks().forEach(x -> seenRacks.add(x.id));
+ Map<String, Set<String>> rackIdToHosts = nodeSorterHostProximity2.getRackIdToHosts();
+ String dumpOfRacks = rackIdToHosts.entrySet().stream()
+ .map(x -> String.format("rack %s -> hosts [%s]", x.getKey(), String.join(",", x.getValue())))
+ .collect(Collectors.joining("\n\t"));
+ assertEquals("Expecting rack cnt to be " + (numRacks - 1) + " but found " + seenRacks.size() + "\n\t" + dumpOfRacks,
+ numRacks - 1, seenRacks.size());
+ assertFalse("Found default-rack=" + DNSToSwitchMapping.DEFAULT_RACK + " in \n\t" + dumpOfRacks,
+ seenRacks.contains(DNSToSwitchMapping.DEFAULT_RACK));
+ }
+ }
+
+ /**
+ * Black list all nodes for a rack before sorting nodes.
+ * Confirm that {@link NodeSorterHostProximity#sortAllNodes()} still works.
+ *
+ */
+ @Test
+ void testWithBlackListedHosts() {
+ INimbus iNimbus = new INimbusTest();
+ double compPcore = 100;
+ double compOnHeap = 775;
+ double compOffHeap = 25;
+ int topo1NumSpouts = 1;
+ int topo1NumBolts = 5;
+ int topo1SpoutParallelism = 100;
+ int topo1BoltParallelism = 200;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 66;
+ long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + topo1NumBolts * topo1BoltParallelism + 10);
+ long compPerSuper = compPerRack / numSupersPerRack;
+ double cpuPerSuper = compPcore * compPerSuper;
+ double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+ double topo1MaxHeapSize = memPerSuper;
+ final String topoName1 = "topology1";
+ int numRacks = 3;
+
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(numRacks, numSupersPerRack, numPortsPerSuper,
+ 0, 0, cpuPerSuper, memPerSuper, new HashMap<>());
+ TestDNSToSwitchMapping testDNSToSwitchMapping = new TestDNSToSwitchMapping(supMap.values());
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(compPcore, compOnHeap, compOffHeap, null, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+ topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, "user", topo1MaxHeapSize);
+
+ Topologies topologies = new Topologies(td1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ Map<String, List<String>> networkTopography = cluster.getNetworkTopography();
+ assertEquals("Expecting " + numRacks + " racks found " + networkTopography.size(), numRacks, networkTopography.size());
+ assertTrue("Expecting racks count to be >= 3, found " + networkTopography.size(), networkTopography.size() >= 3);
+
+ Set<String> blackListedHosts = new HashSet<>();
+ List<SupervisorDetails> supArray = new ArrayList<>(supMap.values());
+ for (int i = 0 ; i < numSupersPerRack ; i++) {
+ blackListedHosts.add(supArray.get(i).getHost());
+ }
+ blacklistHostsAndSortNodes(blackListedHosts, supMap.values(), cluster, td1);
+
+ String rackToClear = cluster.getNetworkTopography().keySet().stream().findFirst().get();
+ blackListedHosts = new HashSet<>(cluster.getNetworkTopography().get(rackToClear));
+ blacklistHostsAndSortNodes(blackListedHosts, supMap.values(), cluster, td1);
+ }
+
+ // Impair cluster by blacklisting some hosts
+ private void blacklistHostsAndSortNodes(
+ Set<String> blackListedHosts, Collection<SupervisorDetails> sups, Cluster cluster, TopologyDetails td1) {
+ LOG.info("blackListedHosts={}", blackListedHosts);
+ cluster.setBlacklistedHosts(blackListedHosts);
+
+ NodeSorterHostProximity nodeSorterHostProximity = new NodeSorterHostProximity(cluster, td1);
+ // confirm that the above action loses hosts
+ {
+ Set<String> allHosts = sups.stream().map(x -> x.getHost()).collect(Collectors.toSet());
+ Set<String> seenRacks = new HashSet<>();
+ nodeSorterHostProximity.getSortedRacks().forEach(x -> seenRacks.add(x.id));
+ Set<String> seenHosts = new HashSet<>();
+ nodeSorterHostProximity.getRackIdToHosts().forEach((k,v) -> seenHosts.addAll(v));
+ allHosts.removeAll(seenHosts);
+ assertEquals("Expecting only blacklisted hosts removed", allHosts, blackListedHosts);
+ }
+
+ // now check if sortAllNodes still works
+ Set<String> expectedNodes = sups.stream()
+ .filter(x -> !blackListedHosts.contains(x.getHost()))
+ .map(x ->x.getId())
+ .collect(Collectors.toSet());
+ Set<String> seenNodes = new HashSet<>();
+ nodeSorterHostProximity.prepare(null);
+ nodeSorterHostProximity.sortAllNodes().forEach( n -> seenNodes.add(n));
+ assertEquals("Expecting see all supervisors ", expectedNodes, seenNodes);
+ }
+
+ /**
+ * Impair the cluster for a specified rackId.
+ * <li>making the host list a zero length</li>
+ * <li>removing supervisors for the hosts on the rack</li>
+ *
+ * @param cluster cluster to impair
+ * @param rackId rackId to clear
+ * @param clearNetworkTopography if true, then clear (but not remove) the hosts in list for the rack.
+ * @param clearSupervisorMap if true, then remove supervisors for the rack.
+ */
+ private void impairClusterRack(Cluster cluster, String rackId, boolean clearNetworkTopography, boolean clearSupervisorMap) {
+ Set<String> hostIds = new HashSet<>(cluster.getNetworkTopography().computeIfAbsent(rackId, k -> new ArrayList<>()));
+ if (clearNetworkTopography) {
+ cluster.getNetworkTopography().computeIfAbsent(rackId, k -> new ArrayList<>()).clear();
+ }
+ if (clearSupervisorMap) {
+ Set<String> supToRemove = new HashSet<>();
+ for (String hostId: hostIds) {
+ cluster.getSupervisorsByHost(hostId).forEach(s -> supToRemove.add(s.getId()));
+ }
+ Map<String, SupervisorDetails> supervisorDetailsMap = cluster.getSupervisors();
+ for (String supId: supToRemove) {
+ supervisorDetailsMap.remove(supId);
+ }
+ }
+ }
+}
\ No newline at end of file