Merge pull request #3096 from dandsager1/STORM-3480
STORM-3480 Implement One Worker Per Executor RAS Option
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8434b1d..e3c33bd 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -336,6 +336,7 @@
resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
topology.ras.constraint.max.state.search: 10_000 # The maximum number of states that will be searched looking for a solution in the constraint solver strategy
resource.aware.scheduler.constraint.max.state.search: 100_000 # Daemon limit on maximum number of states that will be searched looking for a solution in the constraint solver strategy
+topology.ras.one.executor.per.worker: false
blacklist.scheduler.tolerance.time.secs: 300
blacklist.scheduler.tolerance.count: 3
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index a2aa58a..5d89fc9 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -316,6 +316,12 @@
@IsPositiveNumber
public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH = "topology.ras.constraint.max.state.search";
/**
+ * Whether to limit each worker to one executor. This is useful for debugging topologies to clearly identify workers that
+ * are slow/crashing and for estimating resource requirements and capacity.
+ */
+ @IsBoolean
+ public static final String TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER = "topology.ras.one.executor.per.worker";
+ /**
* The maximum number of seconds to spend scheduling a topology using the constraint solver. Null means no limit.
*/
@IsInteger
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 0662180..a65057d 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -125,6 +125,11 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
index bb932ed..a34292c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
@@ -37,7 +37,7 @@
private double assignedNonSharedMemOffHeap;
private double assignedCpu;
private TopologyResources(TopologyDetails td, Collection<WorkerResources> workers,
- Map<String, Double> sharedOffHeap) {
+ Map<String, Double> nodeIdToSharedOffHeapNode) {
requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
requestedSharedMemOnHeap = td.getRequestedSharedOnHeap();
@@ -73,17 +73,17 @@
}
}
- if (sharedOffHeap != null) {
- double sharedOff = sharedOffHeap.values().stream().reduce(0.0, (sum, val) -> sum + val);
+ if (nodeIdToSharedOffHeapNode != null) {
+ double sharedOff = nodeIdToSharedOffHeapNode.values().stream().reduce(0.0, (sum, val) -> sum + val);
assignedSharedMemOffHeap += sharedOff;
assignedMemOffHeap += sharedOff;
}
}
public TopologyResources(TopologyDetails td, SchedulerAssignment assignment) {
- this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment));
+ this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeapNode(assignment));
}
public TopologyResources(TopologyDetails td, Assignment assignment) {
- this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment));
+ this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeapNode(assignment));
}
public TopologyResources() {
this(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
@@ -142,15 +142,15 @@
return ret;
}
- private static Map<String, Double> getNodeIdToSharedOffHeap(SchedulerAssignment assignment) {
+ private static Map<String, Double> getNodeIdToSharedOffHeapNode(SchedulerAssignment assignment) {
Map<String, Double> ret = null;
if (assignment != null) {
- ret = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ ret = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
}
return ret;
}
- private static Map<String, Double> getNodeIdToSharedOffHeap(Assignment assignment) {
+ private static Map<String, Double> getNodeIdToSharedOffHeapNode(Assignment assignment) {
Map<String, Double> ret = null;
if (assignment != null) {
ret = assignment.get_total_shared_off_heap();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 0f09aa7..5e1608a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -54,8 +54,6 @@
*/
public class Cluster implements ISchedulingState {
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
- private static final Function<String, Set<WorkerSlot>> MAKE_SET = (x) -> new HashSet<>();
- private static final Function<String, Map<WorkerSlot, NormalizedResourceRequest>> MAKE_MAP = (x) -> new HashMap<>();
/**
* key: supervisor id, value: supervisor details.
@@ -80,6 +78,7 @@
private final Map<String, Object> conf;
private final Topologies topologies;
private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
+ private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache; // node -> topologyId -> double
private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
private final ResourceMetrics resourceMetrics;
@@ -88,6 +87,14 @@
private INimbus inimbus;
private double minWorkerCpu = 0.0;
+ private static <K, V> Map<K, V> makeMap(String key) {
+ return new HashMap<>();
+ }
+
+ private static <K> Set<K> makeSet(String key) {
+ return new HashSet<>();
+ }
+
public Cluster(
INimbus nimbus,
ResourceMetrics resourceMetrics,
@@ -148,6 +155,7 @@
this.resourceMetrics = resourceMetrics;
this.supervisors.putAll(supervisors);
this.nodeToScheduledResourcesCache = new HashMap<>(this.supervisors.size());
+ this.nodeToScheduledOffHeapNodeMemoryCache = new HashMap<>();
this.nodeToUsedSlotsCache = new HashMap<>(this.supervisors.size());
for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
@@ -359,7 +367,7 @@
@Override
public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
- return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), MAKE_SET)
+ return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), Cluster::makeSet)
.stream()
.map(WorkerSlot::getPort)
.collect(Collectors.toSet());
@@ -504,7 +512,7 @@
}
for (SharedMemory shared : td.getSharedMemoryRequests(executors)) {
totalResources.addOffHeap(shared.get_off_heap_worker());
- totalResources.addOnHeap(shared.get_off_heap_worker());
+ totalResources.addOnHeap(shared.get_on_heap());
addResource(
sharedTotalResources,
@@ -578,8 +586,8 @@
afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
afterOnHeap = wrAfter.get_mem_on_heap();
- currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
- afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
+ currentTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment);
+ afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, exec);
afterCpuTotal = wrAfter.get_cpu();
} else {
WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
@@ -673,9 +681,9 @@
assignment.assign(slot, executors, resources);
String nodeId = slot.getNodeId();
- double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
- assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
- updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
+ double sharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, assignment);
+ assignment.setTotalSharedOffHeapNodeMemory(nodeId, sharedOffHeapNodeMemory);
+ updateCachesForWorkerSlot(slot, resources, topologyId, sharedOffHeapNodeMemory);
totalResourcesPerNodeCache.remove(slot.getNodeId());
}
@@ -700,17 +708,17 @@
}
/**
- * Calculate the amount of shared off heap memory on a given nodes with the given assignment.
+ * Calculate the amount of shared off heap node memory on a given node with the given assignment.
*
* @param nodeId the id of the node
* @param assignment the current assignment
- * @return the amount of shared off heap memory for that node in MB
+ * @return the amount of shared off heap node memory for that node in MB
*/
- private double calculateSharedOffHeapMemory(String nodeId, SchedulerAssignmentImpl assignment) {
- return calculateSharedOffHeapMemory(nodeId, assignment, null);
+ private double calculateSharedOffHeapNodeMemory(String nodeId, SchedulerAssignmentImpl assignment) {
+ return calculateSharedOffHeapNodeMemory(nodeId, assignment, null);
}
- private double calculateSharedOffHeapMemory(
+ private double calculateSharedOffHeapNodeMemory(
String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
double memorySharedWithinNode = 0.0;
TopologyDetails td = topologies.getById(assignment.getTopologyId());
@@ -743,10 +751,10 @@
assertValidTopologyForModification(assignment.getTopologyId());
assignment.unassignBySlot(slot);
String nodeId = slot.getNodeId();
- assignment.setTotalSharedOffHeapMemory(
- nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
- nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(slot, new NormalizedResourceRequest());
- nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).remove(slot);
+ assignment.setTotalSharedOffHeapNodeMemory(
+ nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment));
+ nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(slot, new NormalizedResourceRequest());
+ nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).remove(slot);
}
}
//Invalidate the cache as something on the node changed
@@ -768,7 +776,7 @@
@Override
public boolean isSlotOccupied(WorkerSlot slot) {
- return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), MAKE_SET).contains(slot);
+ return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), Cluster::makeSet).contains(slot);
}
@Override
@@ -963,7 +971,7 @@
sr = sr.add(entry.getValue());
ret.put(id, sr);
}
- Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
if (nodeIdToSharedOffHeap != null) {
for (Entry<String, Double> entry : nodeIdToSharedOffHeap.entrySet()) {
String id = entry.getKey();
@@ -1003,13 +1011,14 @@
/**
* This method updates ScheduledResources and UsedSlots cache for given workerSlot.
*/
- private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, Double sharedoffHeapMemory) {
+ private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, String topologyId,
+ Double sharedOffHeapNodeMemory) {
String nodeId = workerSlot.getNodeId();
NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
normalizedResourceRequest.add(workerResources);
- normalizedResourceRequest.addOffHeap(sharedoffHeapMemory);
- nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(workerSlot, normalizedResourceRequest);
- nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).add(workerSlot);
+ nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(workerSlot, normalizedResourceRequest);
+ nodeToScheduledOffHeapNodeMemoryCache.computeIfAbsent(nodeId, Cluster::makeMap).put(topologyId, sharedOffHeapNodeMemory);
+ nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).add(workerSlot);
}
public ResourceMetrics getResourceMetrics() {
@@ -1019,10 +1028,17 @@
@Override
public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
return totalResourcesPerNodeCache.computeIfAbsent(nodeId, (nid) -> {
+ // executor resources
NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
- for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).values()) {
+ for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).values()) {
totalScheduledResources.add(req);
}
+ // shared off heap node memory
+ for (Double offHeapNodeMemory : nodeToScheduledOffHeapNodeMemoryCache.
+ computeIfAbsent(nid, Cluster::makeMap).values()) {
+ totalScheduledResources.addOffHeap(offHeapNodeMemory);
+ }
+
return totalScheduledResources;
});
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
index 347c95f..e2ad2d4 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
@@ -83,9 +83,9 @@
public Map<WorkerSlot, WorkerResources> getScheduledResources();
/**
- * Get the total shared off heap memory mapping.
+ * Get the total shared off heap node memory mapping.
*
- * @return host to total shared off heap memory mapping.
+ * @return host to total shared off heap node memory mapping.
*/
- public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory();
+ public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory();
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index 077dafe..ee41630 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -18,12 +18,10 @@
package org.apache.storm.scheduler;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@@ -45,7 +43,7 @@
*/
private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
- private final Map<String, Double> nodeIdToTotalSharedOffHeap = new HashMap<>();
+ private final Map<String, Double> nodeIdToTotalSharedOffHeapNode = new HashMap<>();
private final Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutors = new HashMap<>();
/**
@@ -78,7 +76,7 @@
if (nodeIdToTotalSharedOffHeap.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
throw new RuntimeException("Cannot create off heap with a null in it " + nodeIdToTotalSharedOffHeap);
}
- this.nodeIdToTotalSharedOffHeap.putAll(nodeIdToTotalSharedOffHeap);
+ this.nodeIdToTotalSharedOffHeapNode.putAll(nodeIdToTotalSharedOffHeap);
}
}
@@ -88,7 +86,7 @@
public SchedulerAssignmentImpl(SchedulerAssignment assignment) {
this(assignment.getTopologyId(), assignment.getExecutorToSlot(),
- assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapMemory());
+ assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapNodeMemory());
}
@Override
@@ -132,7 +130,7 @@
SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
return resources.equals(o.resources)
- && nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap);
+ && nodeIdToTotalSharedOffHeapNode.equals(o.nodeIdToTotalSharedOffHeapNode);
}
@Override
@@ -186,7 +184,7 @@
}
}
if (!isFound) {
- nodeIdToTotalSharedOffHeap.remove(node);
+ nodeIdToTotalSharedOffHeapNode.remove(node);
}
}
@@ -225,12 +223,12 @@
return resources;
}
- public void setTotalSharedOffHeapMemory(String node, double value) {
- nodeIdToTotalSharedOffHeap.put(node, value);
+ public void setTotalSharedOffHeapNodeMemory(String node, double value) {
+ nodeIdToTotalSharedOffHeapNode.put(node, value);
}
@Override
- public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory() {
- return nodeIdToTotalSharedOffHeap;
+ public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory() {
+ return nodeIdToTotalSharedOffHeapNode;
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index e1cd1cf..ddea221 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -19,6 +19,7 @@
package org.apache.storm.scheduler.resource;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -163,11 +164,11 @@
* @return the slots currently assigned to that topology on this node.
*/
public Collection<WorkerSlot> getUsedSlots(String topId) {
- Collection<WorkerSlot> ret = null;
if (topIdToUsedSlots.get(topId) != null) {
- ret = workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+ return workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+ } else {
+ return Collections.emptySet();
}
- return ret;
}
public boolean isAlive() {
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 15ddbf6..4db19ac 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -45,7 +45,6 @@
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
@@ -54,6 +53,7 @@
public abstract class BaseResourceAwareStrategy implements IStrategy {
private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
protected Cluster cluster;
+ private boolean oneExecutorPerWorker = false;
// Rack id to list of host names in that rack
private Map<String, List<String>> networkTopography;
private final Map<String, String> superIdToRack = new HashMap<>();
@@ -86,6 +86,10 @@
logClusterInfo();
}
+ protected void setOneExecutorPerWorker(boolean oneExecutorPerWorker) {
+ this.oneExecutorPerWorker = oneExecutorPerWorker;
+ }
+
@Override
public void prepare(Map<String, Object> config) {
//NOOP
@@ -148,9 +152,12 @@
for (String id : sortedNodes) {
RAS_Node node = nodes.getNodeById(id);
if (node.couldEverFit(exec, td)) {
+ Collection<WorkerSlot> topologyUsedSlots = oneExecutorPerWorker ? node.getUsedSlots(td.getId()) : Collections.emptySet();
for (WorkerSlot ws : node.getSlotsAvailableToScheduleOn()) {
- if (node.wouldFit(ws, exec, td)) {
- return ws;
+ if (!topologyUsedSlots.contains(ws)) {
+ if (node.wouldFit(ws, exec, td)) {
+ return ws;
+ }
}
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 6c3c1f7..9b2a7bd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -39,6 +39,9 @@
@Override
public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+ boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
+ setOneExecutorPerWorker(oneExecutorPerWorker);
+
prepare(cluster);
if (nodes.getNodes().size() <= 0) {
LOG.warn("No available nodes to schedule tasks on!");
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesExtension.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesExtension.java
new file mode 100644
index 0000000..b3c8091
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesExtension.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class NormalizedResourcesExtension implements BeforeEachCallback, AfterEachCallback {
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ NormalizedResources.resetResourceNames();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) {
+ NormalizedResources.resetResourceNames();
+ }
+}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index d72d362..fa4ad78 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -18,8 +18,9 @@
package org.apache.storm.scheduler.resource.strategies.scheduling;
+import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.scheduler.IScheduler;
-import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesRule;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension;
import java.util.Collections;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
@@ -42,20 +43,24 @@
import org.apache.storm.topology.SharedOffHeapWithinWorker;
import org.apache.storm.topology.SharedOnHeap;
import org.apache.storm.topology.TopologyBuilder;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;
import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -67,11 +72,17 @@
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+@ExtendWith({NormalizedResourcesExtension.class})
public class TestDefaultResourceAwareStrategy {
private static final Logger LOG = LoggerFactory.getLogger(TestDefaultResourceAwareStrategy.class);
private static final int CURRENT_TIME = 1450418597;
private static IScheduler scheduler = null;
+ private enum SharedMemoryType {
+ SHARED_OFF_HEAP_NODE,
+ SHARED_OFF_HEAP_WORKER,
+ SHARED_ON_HEAP_WORKER
+ };
private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
private final Map<String, String> result;
@@ -93,10 +104,7 @@
}
};
- @Rule
- public NormalizedResourcesRule nrRule = new NormalizedResourcesRule();
-
- @After
+ @AfterEach
public void cleanup() {
if (scheduler != null) {
scheduler.cleanup();
@@ -104,20 +112,111 @@
}
}
- /**
- * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
+ /*
+ * test assigned memory with shared memory types and oneWorkerPerExecutor
*/
- @Test
- public void testDefaultResourceAwareStrategySharedMemory() {
+ @ParameterizedTest
+ @EnumSource(SharedMemoryType.class)
+ public void testMultipleSharedMemoryWithOneExecutorPerWorker(SharedMemoryType memoryType) {
+ int spoutParallelism = 4;
+ double cpuPercent = 10;
+ double memoryOnHeap = 10;
+ double memoryOffHeap = 10;
+ double sharedOnHeapWithinWorker = 450;
+ double sharedOffHeapNode = 600;
+ double sharedOffHeapWithinWorker = 400;
+
+ TopologyBuilder builder = new TopologyBuilder();
+ switch (memoryType) {
+ case SHARED_OFF_HEAP_NODE:
+ builder.setSpout("spout", new TestSpout(), spoutParallelism)
+ .addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "spout shared off heap node"));
+ break;
+ case SHARED_OFF_HEAP_WORKER:
+ builder.setSpout("spout", new TestSpout(), spoutParallelism)
+ .addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "spout shared off heap worker"));
+ break;
+ case SHARED_ON_HEAP_WORKER:
+ builder.setSpout("spout", new TestSpout(), spoutParallelism)
+ .addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "spout shared on heap worker"));
+ break;
+ }
+ StormTopology stormToplogy = builder.createTopology();
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 1000);
+ Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
+
+ conf.put(Config.TOPOLOGY_PRIORITY, 0);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology");
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+ conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, true);
+ TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+ genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+ Topologies topologies = new Topologies(topo);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(conf);
+ scheduler.schedule(topologies, cluster);
+
+ TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
+ SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+ long numNodes = assignment.getSlotToExecutors().keySet().stream().map(ws -> ws.getNodeId()).distinct().count();
+
+ switch (memoryType) {
+ case SHARED_OFF_HEAP_NODE:
+ // 4 workers on single node. OffHeapNode memory is shared
+ assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap + sharedOffHeapNode, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(0, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(sharedOffHeapNode, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
+ assertThat(numNodes, is(1L));
+ assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+ break;
+ case SHARED_OFF_HEAP_WORKER:
+ // 4 workers on 2 nodes. OffHeapWorker memory not shared -- consumed 4x, once for each worker)
+ assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * (memoryOffHeap + sharedOffHeapWithinWorker), 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(0, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(spoutParallelism * sharedOffHeapWithinWorker, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
+ assertThat(numNodes, is(2L));
+ assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+ break;
+ case SHARED_ON_HEAP_WORKER:
+ // 4 workers on 2 nodes. onHeap memory not shared -- consumed 4x, once for each worker
+ assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism * (memoryOnHeap + sharedOnHeapWithinWorker), 0.01));
+ assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(spoutParallelism * sharedOnHeapWithinWorker, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(0, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
+ assertThat(numNodes, is(2L));
+ assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+ break;
+ }
+ }
+
+ /**
+ * test if the scheduling shared memory is correct with/without oneExecutorPerWorker enabled
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDefaultResourceAwareStrategySharedMemory(boolean oneExecutorPerWorker) {
int spoutParallelism = 2;
int boltParallelism = 2;
int numBolts = 3;
double cpuPercent = 10;
double memoryOnHeap = 10;
double memoryOffHeap = 10;
- double sharedOnHeap = 500;
+ double sharedOnHeap = 400;
double sharedOffHeapNode = 700;
- double sharedOffHeapWorker = 500;
+ double sharedOffHeapWorker = 600;
+
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout(),
spoutParallelism);
@@ -133,10 +232,11 @@
INimbus iNimbus = new INimbusTest();
Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 2000);
Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
-
+
conf.put(Config.TOPOLOGY_PRIORITY, 0);
conf.put(Config.TOPOLOGY_NAME, "testTopology");
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+ conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, oneExecutorPerWorker);
TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
@@ -144,10 +244,20 @@
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
scheduler = new ResourceAwareScheduler();
-
scheduler.prepare(conf);
scheduler.schedule(topologies, cluster);
-
+
+ // one worker per executor scheduling
+ // [3,3] [7,7], [0,0] [2,2] [6,6] [1,1] [5,5] [4,4] sorted executor ordering
+ // spout [0,0] [1,1]
+ // bolt-1 [2,2] [3,3]
+ // bolt-2 [6,6] [7,7]
+ // bolt-3 [4,4] [5,5]
+ //
+ // expect 8 workers over 2 nodes
+ // node r000s000 workers: bolt-1 bolt-2 spout bolt-1 (no memory sharing)
+ // node r000s001 workers: bolt-2 spout bolt-3 bolt-3 (no memory sharing)
+
for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet()) {
String supervisorId = entry.getKey();
SupervisorResources resources = entry.getValue();
@@ -155,28 +265,60 @@
assertTrue(supervisorId, resources.getTotalMem() >= resources.getUsedMem());
}
- // Everything should fit in a single slot
- int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
- double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
- double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
- double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
-
- SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
- assertEquals(1, assignment.getSlots().size());
- WorkerSlot ws = assignment.getSlots().iterator().next();
- String nodeId = ws.getNodeId();
- assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
- assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
- assertEquals(1, assignment.getScheduledResources().size());
- WorkerResources resources = assignment.getScheduledResources().get(ws);
- assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
- assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
- assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
- assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
- assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
+ if (!oneExecutorPerWorker) {
+ // Everything should fit in a single slot
+ int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
+ double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
+ double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
+ double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
+
+ SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+ assertThat(assignment.getSlots().size(), is(1));
+ WorkerSlot ws = assignment.getSlots().iterator().next();
+ String nodeId = ws.getNodeId();
+ assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().size(), is(1));
+ assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId), closeTo(sharedOffHeapNode, 0.01));
+ assertThat(assignment.getScheduledResources().size(), is(1));
+ WorkerResources resources = assignment.getScheduledResources().get(ws);
+ assertThat(resources.get_cpu(), closeTo(totalExpectedCPU, 0.01));
+ assertThat(resources.get_mem_on_heap(), closeTo(totalExpectedOnHeap, 0.01));
+ assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap, 0.01));
+ assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeap, 0.01));
+ assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWorker, 0.01));
+ } else {
+ // one worker per executor
+ int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
+ TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
+
+ // get expected mem on topology rather than per executor
+ double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 2 * sharedOnHeap;
+ double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 2 * sharedOffHeapWorker + 2 * sharedOffHeapNode;
+ double expectedMemSharedOnHeap = 2 * sharedOnHeap;
+ double expectedMemSharedOffHeap = 2 * sharedOffHeapWorker + 2 * sharedOffHeapNode;
+ double expectedMemNonSharedOnHeap = totalNumberOfTasks * memoryOnHeap;
+ double expectedMemNonSharedOffHeap = totalNumberOfTasks * memoryOffHeap;
+ assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(expectedMemOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(expectedMemOffHeap, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(expectedMemSharedOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(expectedMemSharedOffHeap, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(expectedMemNonSharedOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(expectedMemNonSharedOffHeap, 0.01));
+
+ double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
+ assertThat(topologyResources.getAssignedCpu(), closeTo(totalExpectedCPU, 0.01));
+
+ // expect 8 workers
+ SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+ int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
+ assertThat(numAssignedWorkers, is(8));
+ assertThat(assignment.getSlots().size(), is(8));
+
+ // expect 2 nodes
+ long numNodes = assignment.getSlotToExecutors().keySet().stream().map(ws -> ws.getNodeId()).distinct().count();
+ assertThat(numNodes, is(2L));
+ }
}
-
/**
* test if the scheduling logic for the DefaultResourceAwareStrategy is correct
*/
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index 037d226..033b3cf 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -140,7 +140,7 @@
SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
Set<WorkerSlot> slots = assignment.getSlots();
- Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
assertEquals(2, slots.size());