Merge pull request #3096 from dandsager1/STORM-3480

STORM-3480 Implement One Worker Per Executor RAS Option
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8434b1d..e3c33bd 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -336,6 +336,7 @@
 resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
 topology.ras.constraint.max.state.search: 10_000     # The maximum number of states that will be searched looking for a solution in the constraint solver strategy
 resource.aware.scheduler.constraint.max.state.search: 100_000 # Daemon limit on maximum number of states that will be searched looking for a solution in the constraint solver strategy
+topology.ras.one.executor.per.worker: false
 
 blacklist.scheduler.tolerance.time.secs: 300
 blacklist.scheduler.tolerance.count: 3
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index a2aa58a..5d89fc9 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -316,6 +316,12 @@
     @IsPositiveNumber
     public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH = "topology.ras.constraint.max.state.search";
     /**
+     * Whether to limit each worker to one executor. This is useful for debugging topologies to clearly identify workers that
+     * are slow/crashing and for estimating resource requirements and capacity.
+     */
+    @IsBoolean
+    public static final String TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER = "topology.ras.one.executor.per.worker";
+    /**
      * The maximum number of seconds to spend scheduling a topology using the constraint solver.  Null means no limit.
      */
     @IsInteger
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 0662180..a65057d 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -125,6 +125,11 @@
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
index bb932ed..a34292c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
@@ -37,7 +37,7 @@
     private double assignedNonSharedMemOffHeap;
     private double assignedCpu;
     private TopologyResources(TopologyDetails td, Collection<WorkerResources> workers,
-                              Map<String, Double> sharedOffHeap) {
+                              Map<String, Double> nodeIdToSharedOffHeapNode) {
         requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
         requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
         requestedSharedMemOnHeap = td.getRequestedSharedOnHeap();
@@ -73,17 +73,17 @@
             }
         }
 
-        if (sharedOffHeap != null) {
-            double sharedOff = sharedOffHeap.values().stream().reduce(0.0, (sum, val) -> sum + val);
+        if (nodeIdToSharedOffHeapNode != null) {
+            double sharedOff = nodeIdToSharedOffHeapNode.values().stream().reduce(0.0, (sum, val) -> sum + val);
             assignedSharedMemOffHeap += sharedOff;
             assignedMemOffHeap += sharedOff;
         }
     }
     public TopologyResources(TopologyDetails td, SchedulerAssignment assignment) {
-        this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment));
+        this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeapNode(assignment));
     }
     public TopologyResources(TopologyDetails td, Assignment assignment) {
-        this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment));
+        this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeapNode(assignment));
     }
     public TopologyResources() {
         this(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
@@ -142,15 +142,15 @@
         return ret;
     }
 
-    private static Map<String, Double> getNodeIdToSharedOffHeap(SchedulerAssignment assignment) {
+    private static Map<String, Double> getNodeIdToSharedOffHeapNode(SchedulerAssignment assignment) {
         Map<String, Double> ret = null;
         if (assignment != null) {
-            ret = assignment.getNodeIdToTotalSharedOffHeapMemory();
+            ret = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
         }
         return ret;
     }
 
-    private static Map<String, Double> getNodeIdToSharedOffHeap(Assignment assignment) {
+    private static Map<String, Double> getNodeIdToSharedOffHeapNode(Assignment assignment) {
         Map<String, Double> ret = null;
         if (assignment != null) {
             ret = assignment.get_total_shared_off_heap();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 0f09aa7..5e1608a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -54,8 +54,6 @@
  */
 public class Cluster implements ISchedulingState {
     private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
-    private static final Function<String, Set<WorkerSlot>> MAKE_SET = (x) -> new HashSet<>();
-    private static final Function<String, Map<WorkerSlot, NormalizedResourceRequest>> MAKE_MAP = (x) -> new HashMap<>();
 
     /**
      * key: supervisor id, value: supervisor details.
@@ -80,6 +78,7 @@
     private final Map<String, Object> conf;
     private final Topologies topologies;
     private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
+    private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache;   // node -> topologyId -> double
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
     private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
     private final ResourceMetrics resourceMetrics;
@@ -88,6 +87,14 @@
     private INimbus inimbus;
     private double minWorkerCpu = 0.0;
 
+    private static <K, V> Map<K, V> makeMap(String key) {
+        return new HashMap<>();
+    }
+
+    private static <K> Set<K> makeSet(String key) {
+        return new HashSet<>();
+    }
+
     public Cluster(
         INimbus nimbus,
         ResourceMetrics resourceMetrics,
@@ -148,6 +155,7 @@
         this.resourceMetrics = resourceMetrics;
         this.supervisors.putAll(supervisors);
         this.nodeToScheduledResourcesCache = new HashMap<>(this.supervisors.size());
+        this.nodeToScheduledOffHeapNodeMemoryCache = new HashMap<>();
         this.nodeToUsedSlotsCache = new HashMap<>(this.supervisors.size());
 
         for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
@@ -359,7 +367,7 @@
 
     @Override
     public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
-        return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), MAKE_SET)
+        return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), Cluster::makeSet)
             .stream()
             .map(WorkerSlot::getPort)
             .collect(Collectors.toSet());
@@ -504,7 +512,7 @@
         }
         for (SharedMemory shared : td.getSharedMemoryRequests(executors)) {
             totalResources.addOffHeap(shared.get_off_heap_worker());
-            totalResources.addOnHeap(shared.get_off_heap_worker());
+            totalResources.addOnHeap(shared.get_on_heap());
 
             addResource(
                 sharedTotalResources,
@@ -578,8 +586,8 @@
             afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
             afterOnHeap = wrAfter.get_mem_on_heap();
 
-            currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
-            afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
+            currentTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment);
+            afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, exec);
             afterCpuTotal = wrAfter.get_cpu();
         } else {
             WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
@@ -673,9 +681,9 @@
 
         assignment.assign(slot, executors, resources);
         String nodeId = slot.getNodeId();
-        double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
-        assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
-        updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
+        double sharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, assignment);
+        assignment.setTotalSharedOffHeapNodeMemory(nodeId, sharedOffHeapNodeMemory);
+        updateCachesForWorkerSlot(slot, resources, topologyId, sharedOffHeapNodeMemory);
         totalResourcesPerNodeCache.remove(slot.getNodeId());
     }
 
@@ -700,17 +708,17 @@
     }
 
     /**
-     * Calculate the amount of shared off heap memory on a given nodes with the given assignment.
+     * Calculate the amount of shared off heap node memory on a given node with the given assignment.
      *
      * @param nodeId     the id of the node
      * @param assignment the current assignment
-     * @return the amount of shared off heap memory for that node in MB
+     * @return the amount of shared off heap node memory for that node in MB
      */
-    private double calculateSharedOffHeapMemory(String nodeId, SchedulerAssignmentImpl assignment) {
-        return calculateSharedOffHeapMemory(nodeId, assignment, null);
+    private double calculateSharedOffHeapNodeMemory(String nodeId, SchedulerAssignmentImpl assignment) {
+        return calculateSharedOffHeapNodeMemory(nodeId, assignment, null);
     }
 
-    private double calculateSharedOffHeapMemory(
+    private double calculateSharedOffHeapNodeMemory(
         String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
         double memorySharedWithinNode = 0.0;
         TopologyDetails td = topologies.getById(assignment.getTopologyId());
@@ -743,10 +751,10 @@
                 assertValidTopologyForModification(assignment.getTopologyId());
                 assignment.unassignBySlot(slot);
                 String nodeId = slot.getNodeId();
-                assignment.setTotalSharedOffHeapMemory(
-                    nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
-                nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(slot, new NormalizedResourceRequest());
-                nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).remove(slot);
+                assignment.setTotalSharedOffHeapNodeMemory(
+                    nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment));
+                nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(slot, new NormalizedResourceRequest());
+                nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).remove(slot);
             }
         }
         //Invalidate the cache as something on the node changed
@@ -768,7 +776,7 @@
 
     @Override
     public boolean isSlotOccupied(WorkerSlot slot) {
-        return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), MAKE_SET).contains(slot);
+        return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), Cluster::makeSet).contains(slot);
     }
 
     @Override
@@ -963,7 +971,7 @@
                 sr = sr.add(entry.getValue());
                 ret.put(id, sr);
             }
-            Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory();
+            Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
             if (nodeIdToSharedOffHeap != null) {
                 for (Entry<String, Double> entry : nodeIdToSharedOffHeap.entrySet()) {
                     String id = entry.getKey();
@@ -1003,13 +1011,14 @@
     /**
      * This method updates ScheduledResources and UsedSlots cache for given workerSlot.
      */
-    private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, Double sharedoffHeapMemory) {
+    private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, String topologyId,
+                                           Double sharedOffHeapNodeMemory) {
         String nodeId = workerSlot.getNodeId();
         NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
         normalizedResourceRequest.add(workerResources);
-        normalizedResourceRequest.addOffHeap(sharedoffHeapMemory);
-        nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(workerSlot, normalizedResourceRequest);
-        nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).add(workerSlot);
+        nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(workerSlot, normalizedResourceRequest);
+        nodeToScheduledOffHeapNodeMemoryCache.computeIfAbsent(nodeId, Cluster::makeMap).put(topologyId, sharedOffHeapNodeMemory);
+        nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).add(workerSlot);
     }
 
     public ResourceMetrics getResourceMetrics() {
@@ -1019,10 +1028,17 @@
     @Override
     public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
         return totalResourcesPerNodeCache.computeIfAbsent(nodeId, (nid) -> {
+            // executor resources
             NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
-            for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).values()) {
+            for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).values()) {
                 totalScheduledResources.add(req);
             }
+            // shared off heap node memory
+            for (Double offHeapNodeMemory : nodeToScheduledOffHeapNodeMemoryCache.
+                    computeIfAbsent(nid, Cluster::makeMap).values()) {
+                totalScheduledResources.addOffHeap(offHeapNodeMemory);
+            }
+
             return totalScheduledResources;
         });
     }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
index 347c95f..e2ad2d4 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
@@ -83,9 +83,9 @@
     public Map<WorkerSlot, WorkerResources> getScheduledResources();
 
     /**
-     * Get the total shared off heap memory mapping.
+     * Get the total shared off heap node memory mapping.
      *
-     * @return host to total shared off heap memory mapping.
+     * @return host to total shared off heap node memory mapping.
      */
-    public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory();
+    public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory();
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index 077dafe..ee41630 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -18,12 +18,10 @@
 
 package org.apache.storm.scheduler;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -45,7 +43,7 @@
      */
     private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
     private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
-    private final Map<String, Double> nodeIdToTotalSharedOffHeap = new HashMap<>();
+    private final Map<String, Double> nodeIdToTotalSharedOffHeapNode = new HashMap<>();
     private final Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutors = new HashMap<>();
 
     /**
@@ -78,7 +76,7 @@
             if (nodeIdToTotalSharedOffHeap.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
                 throw new RuntimeException("Cannot create off heap with a null in it " + nodeIdToTotalSharedOffHeap);
             }
-            this.nodeIdToTotalSharedOffHeap.putAll(nodeIdToTotalSharedOffHeap);
+            this.nodeIdToTotalSharedOffHeapNode.putAll(nodeIdToTotalSharedOffHeap);
         }
     }
 
@@ -88,7 +86,7 @@
 
     public SchedulerAssignmentImpl(SchedulerAssignment assignment) {
         this(assignment.getTopologyId(), assignment.getExecutorToSlot(),
-             assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapMemory());
+             assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapNodeMemory());
     }
 
     @Override
@@ -132,7 +130,7 @@
         SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
 
         return resources.equals(o.resources)
-               && nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap);
+               && nodeIdToTotalSharedOffHeapNode.equals(o.nodeIdToTotalSharedOffHeapNode);
     }
 
     @Override
@@ -186,7 +184,7 @@
             }
         }
         if (!isFound) {
-            nodeIdToTotalSharedOffHeap.remove(node);
+            nodeIdToTotalSharedOffHeapNode.remove(node);
         }
     }
 
@@ -225,12 +223,12 @@
         return resources;
     }
 
-    public void setTotalSharedOffHeapMemory(String node, double value) {
-        nodeIdToTotalSharedOffHeap.put(node, value);
+    public void setTotalSharedOffHeapNodeMemory(String node, double value) {
+        nodeIdToTotalSharedOffHeapNode.put(node, value);
     }
 
     @Override
-    public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory() {
-        return nodeIdToTotalSharedOffHeap;
+    public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory() {
+        return nodeIdToTotalSharedOffHeapNode;
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index e1cd1cf..ddea221 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -19,6 +19,7 @@
 package org.apache.storm.scheduler.resource;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -163,11 +164,11 @@
      * @return the slots currently assigned to that topology on this node.
      */
     public Collection<WorkerSlot> getUsedSlots(String topId) {
-        Collection<WorkerSlot> ret = null;
         if (topIdToUsedSlots.get(topId) != null) {
-            ret = workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+            return workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+        } else {
+            return Collections.emptySet();
         }
-        return ret;
     }
 
     public boolean isAlive() {
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 15ddbf6..4db19ac 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -45,7 +45,6 @@
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Sets;
 import org.slf4j.Logger;
@@ -54,6 +53,7 @@
 public abstract class BaseResourceAwareStrategy implements IStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
     protected Cluster cluster;
+    private boolean oneExecutorPerWorker = false;
     // Rack id to list of host names in that rack
     private Map<String, List<String>> networkTopography;
     private final Map<String, String> superIdToRack = new HashMap<>();
@@ -86,6 +86,10 @@
         logClusterInfo();
     }
 
+    protected void setOneExecutorPerWorker(boolean oneExecutorPerWorker) {
+        this.oneExecutorPerWorker = oneExecutorPerWorker;
+    }
+
     @Override
     public void prepare(Map<String, Object> config) {
         //NOOP
@@ -148,9 +152,12 @@
         for (String id : sortedNodes) {
             RAS_Node node = nodes.getNodeById(id);
             if (node.couldEverFit(exec, td)) {
+                Collection<WorkerSlot> topologyUsedSlots = oneExecutorPerWorker ? node.getUsedSlots(td.getId()) : Collections.emptySet();
                 for (WorkerSlot ws : node.getSlotsAvailableToScheduleOn()) {
-                    if (node.wouldFit(ws, exec, td)) {
-                        return ws;
+                    if (!topologyUsedSlots.contains(ws)) {
+                        if (node.wouldFit(ws, exec, td)) {
+                            return ws;
+                        }
                     }
                 }
             }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 6c3c1f7..9b2a7bd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -39,6 +39,9 @@
 
     @Override
     public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+        boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
+        setOneExecutorPerWorker(oneExecutorPerWorker);
+
         prepare(cluster);
         if (nodes.getNodes().size() <= 0) {
             LOG.warn("No available nodes to schedule tasks on!");
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesExtension.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesExtension.java
new file mode 100644
index 0000000..b3c8091
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesExtension.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class NormalizedResourcesExtension implements BeforeEachCallback, AfterEachCallback {
+    @Override
+    public void beforeEach(ExtensionContext context) {
+        NormalizedResources.resetResourceNames();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) {
+        NormalizedResources.resetResourceNames();
+    }
+}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index d72d362..fa4ad78 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -18,8 +18,9 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
+import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.scheduler.IScheduler;
-import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesRule;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension;
 import java.util.Collections;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
@@ -42,20 +43,24 @@
 import org.apache.storm.topology.SharedOffHeapWithinWorker;
 import org.apache.storm.topology.SharedOnHeap;
 import org.apache.storm.topology.TopologyBuilder;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.*;
 import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -67,11 +72,17 @@
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 
+@ExtendWith({NormalizedResourcesExtension.class})
 public class TestDefaultResourceAwareStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(TestDefaultResourceAwareStrategy.class);
 
     private static final int CURRENT_TIME = 1450418597;
     private static IScheduler scheduler = null;
+    private enum SharedMemoryType {
+        SHARED_OFF_HEAP_NODE,
+        SHARED_OFF_HEAP_WORKER,
+        SHARED_ON_HEAP_WORKER
+    };
 
     private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
         private final Map<String, String> result;
@@ -93,10 +104,7 @@
         }
     };
 
-    @Rule
-    public NormalizedResourcesRule nrRule = new NormalizedResourcesRule();
-
-    @After
+    @AfterEach
     public void cleanup() {
         if (scheduler != null) {
             scheduler.cleanup();
@@ -104,20 +112,111 @@
         }
     }
 
-    /**
-     * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
+    /*
+     * test assigned memory with shared memory types and oneWorkerPerExecutor
      */
-    @Test
-    public void testDefaultResourceAwareStrategySharedMemory() {
+    @ParameterizedTest
+    @EnumSource(SharedMemoryType.class)
+    public void testMultipleSharedMemoryWithOneExecutorPerWorker(SharedMemoryType memoryType) {
+        int spoutParallelism = 4;
+        double cpuPercent = 10;
+        double memoryOnHeap = 10;
+        double memoryOffHeap = 10;
+        double sharedOnHeapWithinWorker = 450;
+        double sharedOffHeapNode = 600;
+        double sharedOffHeapWithinWorker = 400;
+
+        TopologyBuilder builder = new TopologyBuilder();
+        switch (memoryType) {
+            case SHARED_OFF_HEAP_NODE:
+                builder.setSpout("spout", new TestSpout(), spoutParallelism)
+                        .addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "spout shared off heap node"));
+                break;
+            case SHARED_OFF_HEAP_WORKER:
+                builder.setSpout("spout", new TestSpout(), spoutParallelism)
+                        .addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "spout shared off heap worker"));
+                break;
+            case SHARED_ON_HEAP_WORKER:
+                builder.setSpout("spout", new TestSpout(), spoutParallelism)
+                        .addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "spout shared on heap worker"));
+                break;
+        }
+        StormTopology stormToplogy = builder.createTopology();
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 1000);
+        Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
+
+        conf.put(Config.TOPOLOGY_PRIORITY, 0);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology");
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+        conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, true);
+        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+        Topologies topologies = new Topologies(topo);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(conf);
+        scheduler.schedule(topologies, cluster);
+
+        TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
+        SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+        long numNodes = assignment.getSlotToExecutors().keySet().stream().map(ws -> ws.getNodeId()).distinct().count();
+
+        switch (memoryType) {
+            case SHARED_OFF_HEAP_NODE:
+                // 4 workers on single node. OffHeapNode memory is shared
+                assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap + sharedOffHeapNode, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(0, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(sharedOffHeapNode, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
+                assertThat(numNodes, is(1L));
+                assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+                break;
+            case SHARED_OFF_HEAP_WORKER:
+                // 4 workers on 2 nodes. OffHeapWorker memory not shared -- consumed 4x, once for each worker)
+                assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * (memoryOffHeap + sharedOffHeapWithinWorker), 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(0, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(spoutParallelism * sharedOffHeapWithinWorker, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
+                assertThat(numNodes, is(2L));
+                assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+                break;
+            case SHARED_ON_HEAP_WORKER:
+                // 4 workers on 2 nodes. onHeap memory not shared -- consumed 4x, once for each worker
+                assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism * (memoryOnHeap + sharedOnHeapWithinWorker), 0.01));
+                assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(spoutParallelism * sharedOnHeapWithinWorker, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(0, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
+                assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
+                assertThat(numNodes, is(2L));
+                assertThat(cluster.getAssignedNumWorkers(topo), is(spoutParallelism));
+                break;
+        }
+    }
+
+    /**
+     * test if the scheduling shared memory is correct with/without oneExecutorPerWorker enabled
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDefaultResourceAwareStrategySharedMemory(boolean oneExecutorPerWorker) {
         int spoutParallelism = 2;
         int boltParallelism = 2;
         int numBolts = 3;
         double cpuPercent = 10;
         double memoryOnHeap = 10;
         double memoryOffHeap = 10;
-        double sharedOnHeap = 500;
+        double sharedOnHeap = 400;
         double sharedOffHeapNode = 700;
-        double sharedOffHeapWorker = 500;
+        double sharedOffHeapWorker = 600;
+
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("spout", new TestSpout(),
                 spoutParallelism);
@@ -133,10 +232,11 @@
         INimbus iNimbus = new INimbusTest();
         Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 2000);
         Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
-        
+
         conf.put(Config.TOPOLOGY_PRIORITY, 0);
         conf.put(Config.TOPOLOGY_NAME, "testTopology");
         conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+        conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, oneExecutorPerWorker);
         TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
                 genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
 
@@ -144,10 +244,20 @@
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
 
         scheduler = new ResourceAwareScheduler();
-
         scheduler.prepare(conf);
         scheduler.schedule(topologies, cluster);
-        
+
+        // one worker per executor scheduling
+        // [3,3] [7,7], [0,0] [2,2] [6,6] [1,1] [5,5] [4,4] sorted executor ordering
+        // spout  [0,0] [1,1]
+        // bolt-1 [2,2] [3,3]
+        // bolt-2 [6,6] [7,7]
+        // bolt-3 [4,4] [5,5]
+        //
+        // expect 8 workers over 2 nodes
+        // node r000s000 workers: bolt-1 bolt-2 spout bolt-1 (no memory sharing)
+        // node r000s001 workers: bolt-2 spout bolt-3 bolt-3 (no memory sharing)
+
         for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet()) {
             String supervisorId = entry.getKey();
             SupervisorResources resources = entry.getValue();
@@ -155,28 +265,60 @@
             assertTrue(supervisorId, resources.getTotalMem() >= resources.getUsedMem());
         }
 
-        // Everything should fit in a single slot
-        int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
-        double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
-        double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
-        double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
-        
-        SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
-        assertEquals(1, assignment.getSlots().size());
-        WorkerSlot ws = assignment.getSlots().iterator().next();
-        String nodeId = ws.getNodeId();
-        assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
-        assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
-        assertEquals(1, assignment.getScheduledResources().size());
-        WorkerResources resources = assignment.getScheduledResources().get(ws);
-        assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
-        assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
-        assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
-        assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
-        assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
+        if (!oneExecutorPerWorker) {
+            // Everything should fit in a single slot
+            int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
+            double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
+            double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
+            double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
+
+            SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+            assertThat(assignment.getSlots().size(), is(1));
+            WorkerSlot ws = assignment.getSlots().iterator().next();
+            String nodeId = ws.getNodeId();
+            assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().size(), is(1));
+            assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId), closeTo(sharedOffHeapNode, 0.01));
+            assertThat(assignment.getScheduledResources().size(), is(1));
+            WorkerResources resources = assignment.getScheduledResources().get(ws);
+            assertThat(resources.get_cpu(), closeTo(totalExpectedCPU, 0.01));
+            assertThat(resources.get_mem_on_heap(), closeTo(totalExpectedOnHeap, 0.01));
+            assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap, 0.01));
+            assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeap, 0.01));
+            assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWorker, 0.01));
+        } else {
+            // one worker per executor
+            int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
+            TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
+
+            // get expected mem on topology rather than per executor
+            double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 2 * sharedOnHeap;
+            double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 2 * sharedOffHeapWorker + 2 * sharedOffHeapNode;
+            double expectedMemSharedOnHeap = 2 * sharedOnHeap;
+            double expectedMemSharedOffHeap = 2 * sharedOffHeapWorker + 2 * sharedOffHeapNode;
+            double expectedMemNonSharedOnHeap = totalNumberOfTasks * memoryOnHeap;
+            double expectedMemNonSharedOffHeap = totalNumberOfTasks * memoryOffHeap;
+            assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(expectedMemOnHeap, 0.01));
+            assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(expectedMemOffHeap, 0.01));
+            assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(expectedMemSharedOnHeap, 0.01));
+            assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(expectedMemSharedOffHeap, 0.01));
+            assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(expectedMemNonSharedOnHeap, 0.01));
+            assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(expectedMemNonSharedOffHeap, 0.01));
+
+            double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
+            assertThat(topologyResources.getAssignedCpu(), closeTo(totalExpectedCPU, 0.01));
+
+            // expect 8 workers
+            SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+            int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
+            assertThat(numAssignedWorkers, is(8));
+            assertThat(assignment.getSlots().size(), is(8));
+
+            // expect 2 nodes
+            long numNodes = assignment.getSlotToExecutors().keySet().stream().map(ws -> ws.getNodeId()).distinct().count();
+            assertThat(numNodes, is(2L));
+        }
     }
     
-    
     /**
      * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
      */
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index 037d226..033b3cf 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -140,7 +140,7 @@
         
         SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
         Set<WorkerSlot> slots = assignment.getSlots();
-        Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
+        Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
         LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
         Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
         assertEquals(2, slots.size());