Merge pull request #3114 from dandsager1/STORM-3488
STORM-3488 Scheduling can cause RAS_Node resources to become negative
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 756eb79..04f0960 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -564,11 +564,7 @@
}
double currentTotal = 0.0;
- double afterTotal = 0.0;
- double afterOnHeap = 0.0;
-
double currentCpuTotal = 0.0;
- double afterCpuTotal = 0.0;
Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
wouldBeAssigned.add(exec);
@@ -582,18 +578,17 @@
currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
currentCpuTotal = wrCurrent.get_cpu();
}
- WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
- afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
- afterOnHeap = wrAfter.get_mem_on_heap();
- currentTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment);
- afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, exec);
- afterCpuTotal = wrAfter.get_cpu();
- } else {
- WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
- afterCpuTotal = wrAfter.get_cpu();
+ currentTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td);
}
+ WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
+ double afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
+ afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td, exec);
+
+ double afterOnHeap = wrAfter.get_mem_on_heap();
+ double afterCpuTotal = wrAfter.get_cpu();
+
double cpuAdded = afterCpuTotal - currentCpuTotal;
double cpuAvailable = resourcesAvailable.getTotalCpu();
@@ -681,7 +676,7 @@
assignment.assign(slot, executors, resources);
String nodeId = slot.getNodeId();
- double sharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, assignment);
+ double sharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, assignment, td);
assignment.setTotalSharedOffHeapNodeMemory(nodeId, sharedOffHeapNodeMemory);
updateCachesForWorkerSlot(slot, resources, topologyId, sharedOffHeapNodeMemory);
totalResourcesPerNodeCache.remove(slot.getNodeId());
@@ -712,27 +707,28 @@
*
* @param nodeId the id of the node
* @param assignment the current assignment
+ * @param td the topology details
* @return the amount of shared off heap node memory for that node in MB
*/
- private double calculateSharedOffHeapNodeMemory(String nodeId, SchedulerAssignmentImpl assignment) {
- return calculateSharedOffHeapNodeMemory(nodeId, assignment, null);
+ private double calculateSharedOffHeapNodeMemory(String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td) {
+ return calculateSharedOffHeapNodeMemory(nodeId, assignment, td, null);
}
private double calculateSharedOffHeapNodeMemory(
- String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
- double memorySharedWithinNode = 0.0;
- TopologyDetails td = topologies.getById(assignment.getTopologyId());
+ String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td, ExecutorDetails extra) {
Set<ExecutorDetails> executorsOnNode = new HashSet<>();
- for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
- assignment.getSlotToExecutors().entrySet()) {
- if (nodeId.equals(entry.getKey().getNodeId())) {
- executorsOnNode.addAll(entry.getValue());
+ if (assignment != null) {
+ for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry : assignment.getSlotToExecutors().entrySet()) {
+ if (nodeId.equals(entry.getKey().getNodeId())) {
+ executorsOnNode.addAll(entry.getValue());
+ }
}
}
if (extra != null) {
executorsOnNode.add(extra);
}
//Now check for overlap on the node
+ double memorySharedWithinNode = 0.0;
for (SharedMemory shared : td.getSharedMemoryRequests(executorsOnNode)) {
memorySharedWithinNode += shared.get_off_heap_node();
}
@@ -751,8 +747,9 @@
assertValidTopologyForModification(assignment.getTopologyId());
assignment.unassignBySlot(slot);
String nodeId = slot.getNodeId();
+ TopologyDetails td = topologies.getById(assignment.getTopologyId());
assignment.setTotalSharedOffHeapNodeMemory(
- nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment));
+ nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment, td));
nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(slot, new NormalizedResourceRequest());
nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).remove(slot);
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index b10ad91..31d8ecb 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -123,22 +123,22 @@
double memoryOnHeap = 10;
double memoryOffHeap = 10;
double sharedOnHeapWithinWorker = 450;
- double sharedOffHeapNode = 600;
+ double sharedOffHeapWithinNode = 600;
double sharedOffHeapWithinWorker = 400;
TopologyBuilder builder = new TopologyBuilder();
switch (memoryType) {
case SHARED_OFF_HEAP_NODE:
builder.setSpout("spout", new TestSpout(), spoutParallelism)
- .addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "spout shared off heap node"));
+ .addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "spout shared off heap within node"));
break;
case SHARED_OFF_HEAP_WORKER:
builder.setSpout("spout", new TestSpout(), spoutParallelism)
- .addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "spout shared off heap worker"));
+ .addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "spout shared off heap within worker"));
break;
case SHARED_ON_HEAP_WORKER:
builder.setSpout("spout", new TestSpout(), spoutParallelism)
- .addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "spout shared on heap worker"));
+ .addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "spout shared on heap within worker"));
break;
}
StormTopology stormToplogy = builder.createTopology();
@@ -168,9 +168,9 @@
case SHARED_OFF_HEAP_NODE:
// 4 workers on single node. OffHeapNode memory is shared
assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
- assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap + sharedOffHeapNode, 0.01));
+ assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap + sharedOffHeapWithinNode, 0.01));
assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(0, 0.01));
- assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(sharedOffHeapNode, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(sharedOffHeapWithinNode, 0.01));
assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
assertThat(numNodes, is(1L));
@@ -201,6 +201,75 @@
}
}
+ /*
+ * test scheduling does not cause negative resources
+ */
+ @Test
+ public void testSchedulingNegativeResources() {
+ int spoutParallelism = 2;
+ int boltParallelism = 2;
+ double cpuPercent = 10;
+ double memoryOnHeap = 10;
+ double memoryOffHeap = 10;
+ double sharedOnHeapWithinWorker = 400;
+ double sharedOffHeapWithinNode = 700;
+ double sharedOffHeapWithinWorker = 500;
+
+ Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
+ TopologyDetails[] topo = new TopologyDetails[2];
+
+ // 1st topology
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout", new TestSpout(),
+ spoutParallelism);
+ builder.setBolt("bolt-1", new TestBolt(),
+ boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "bolt-1 shared off heap within worker")).shuffleGrouping("spout");
+ builder.setBolt("bolt-2", new TestBolt(),
+ boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "bolt-2 shared off heap within node")).shuffleGrouping("bolt-1");
+ builder.setBolt("bolt-3", new TestBolt(),
+ boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "bolt-3 shared on heap within worker")).shuffleGrouping("bolt-2");
+ StormTopology stormToplogy = builder.createTopology();
+
+ conf.put(Config.TOPOLOGY_PRIORITY, 1);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology-0");
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+ topo[0] = new TopologyDetails("testTopology-id-0", conf, stormToplogy, 0,
+ genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+ // 2nd topology
+ builder = new TopologyBuilder();
+ builder.setSpout("spout", new TestSpout(),
+ spoutParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "spout shared off heap within node"));
+ stormToplogy = builder.createTopology();
+
+ conf.put(Config.TOPOLOGY_PRIORITY, 0);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology-1");
+ topo[1] = new TopologyDetails("testTopology-id-1", conf, stormToplogy, 0,
+ genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+ Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 500, 2000);
+ Topologies topologies = new Topologies(topo[0]);
+ Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+ // schedule 1st topology
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(conf);
+ scheduler.schedule(topologies, cluster);
+ assertTopologiesFullyScheduled(cluster, topo[0].getName());
+
+ // attempt scheduling both topologies.
+ // this triggered negative resource event as the second topology incorrectly scheduled with the first in place
+ // first topology should get evicted for higher priority (lower value) second topology to successfully schedule
+ topologies = new Topologies(topo[0], topo[1]);
+ cluster = new Cluster(cluster, topologies);
+ scheduler.schedule(topologies, cluster);
+ assertTopologiesNotScheduled(cluster, topo[0].getName());
+ assertTopologiesFullyScheduled(cluster, topo[1].getName());
+
+ // check negative resource count
+ assertThat(cluster.getResourceMetrics().getNegativeResourceEventsMeter().getCount(), is(0L));
+ }
+
/**
* test if the scheduling shared memory is correct with/without oneExecutorPerWorker enabled
*/
@@ -213,19 +282,19 @@
double cpuPercent = 10;
double memoryOnHeap = 10;
double memoryOffHeap = 10;
- double sharedOnHeap = 400;
- double sharedOffHeapNode = 700;
- double sharedOffHeapWorker = 600;
+ double sharedOnHeapWithinWorker = 400;
+ double sharedOffHeapWithinNode = 700;
+ double sharedOffHeapWithinWorker = 600;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout(),
spoutParallelism);
builder.setBolt("bolt-1", new TestBolt(),
- boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWorker, "bolt-1 shared off heap worker")).shuffleGrouping("spout");
+ boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "bolt-1 shared off heap within worker")).shuffleGrouping("spout");
builder.setBolt("bolt-2", new TestBolt(),
- boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "bolt-2 shared node")).shuffleGrouping("bolt-1");
+ boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "bolt-2 shared off heap within node")).shuffleGrouping("bolt-1");
builder.setBolt("bolt-3", new TestBolt(),
- boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeap, "bolt-3 shared worker")).shuffleGrouping("bolt-2");
+ boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "bolt-3 shared on heap within worker")).shuffleGrouping("bolt-2");
StormTopology stormToplogy = builder.createTopology();
@@ -269,32 +338,32 @@
// Everything should fit in a single slot
int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
- double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
- double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
+ double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
+ double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker;
SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
assertThat(assignment.getSlots().size(), is(1));
WorkerSlot ws = assignment.getSlots().iterator().next();
String nodeId = ws.getNodeId();
assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().size(), is(1));
- assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId), closeTo(sharedOffHeapNode, 0.01));
+ assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId), closeTo(sharedOffHeapWithinNode, 0.01));
assertThat(assignment.getScheduledResources().size(), is(1));
WorkerResources resources = assignment.getScheduledResources().get(ws);
assertThat(resources.get_cpu(), closeTo(totalExpectedCPU, 0.01));
assertThat(resources.get_mem_on_heap(), closeTo(totalExpectedOnHeap, 0.01));
assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap, 0.01));
- assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeap, 0.01));
- assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWorker, 0.01));
+ assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeapWithinWorker, 0.01));
+ assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWithinWorker, 0.01));
} else {
// one worker per executor
int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
// get expected mem on topology rather than per executor
- double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 2 * sharedOnHeap;
- double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 2 * sharedOffHeapWorker + 2 * sharedOffHeapNode;
- double expectedMemSharedOnHeap = 2 * sharedOnHeap;
- double expectedMemSharedOffHeap = 2 * sharedOffHeapWorker + 2 * sharedOffHeapNode;
+ double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 2 * sharedOnHeapWithinWorker;
+ double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 2 * sharedOffHeapWithinWorker + 2 * sharedOffHeapWithinNode;
+ double expectedMemSharedOnHeap = 2 * sharedOnHeapWithinWorker;
+ double expectedMemSharedOffHeap = 2 * sharedOffHeapWithinWorker + 2 * sharedOffHeapWithinNode;
double expectedMemNonSharedOnHeap = totalNumberOfTasks * memoryOnHeap;
double expectedMemNonSharedOffHeap = totalNumberOfTasks * memoryOffHeap;
assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(expectedMemOnHeap, 0.01));