STORM-3488 Scheduling can cause RAS_Node resources to become negative
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 756eb79..04f0960 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -564,11 +564,7 @@
         }
 
         double currentTotal = 0.0;
-        double afterTotal = 0.0;
-        double afterOnHeap = 0.0;
-
         double currentCpuTotal = 0.0;
-        double afterCpuTotal = 0.0;
 
         Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
         wouldBeAssigned.add(exec);
@@ -582,18 +578,17 @@
                 currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
                 currentCpuTotal = wrCurrent.get_cpu();
             }
-            WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
-            afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
-            afterOnHeap = wrAfter.get_mem_on_heap();
 
-            currentTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment);
-            afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, exec);
-            afterCpuTotal = wrAfter.get_cpu();
-        } else {
-            WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
-            afterCpuTotal = wrAfter.get_cpu();
+            currentTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td);
         }
 
+        WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
+        double afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
+        afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td, exec);
+
+        double afterOnHeap = wrAfter.get_mem_on_heap();
+        double afterCpuTotal = wrAfter.get_cpu();
+
         double cpuAdded = afterCpuTotal - currentCpuTotal;
         double cpuAvailable = resourcesAvailable.getTotalCpu();
 
@@ -681,7 +676,7 @@
 
         assignment.assign(slot, executors, resources);
         String nodeId = slot.getNodeId();
-        double sharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, assignment);
+        double sharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, assignment, td);
         assignment.setTotalSharedOffHeapNodeMemory(nodeId, sharedOffHeapNodeMemory);
         updateCachesForWorkerSlot(slot, resources, topologyId, sharedOffHeapNodeMemory);
         totalResourcesPerNodeCache.remove(slot.getNodeId());
@@ -712,27 +707,28 @@
      *
      * @param nodeId     the id of the node
      * @param assignment the current assignment
+     * @param td         the topology details
      * @return the amount of shared off heap node memory for that node in MB
      */
-    private double calculateSharedOffHeapNodeMemory(String nodeId, SchedulerAssignmentImpl assignment) {
-        return calculateSharedOffHeapNodeMemory(nodeId, assignment, null);
+    private double calculateSharedOffHeapNodeMemory(String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td) {
+        return calculateSharedOffHeapNodeMemory(nodeId, assignment, td, null);
     }
 
     private double calculateSharedOffHeapNodeMemory(
-        String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
-        double memorySharedWithinNode = 0.0;
-        TopologyDetails td = topologies.getById(assignment.getTopologyId());
+        String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td, ExecutorDetails extra) {
         Set<ExecutorDetails> executorsOnNode = new HashSet<>();
-        for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
-            assignment.getSlotToExecutors().entrySet()) {
-            if (nodeId.equals(entry.getKey().getNodeId())) {
-                executorsOnNode.addAll(entry.getValue());
+        if (assignment != null) {
+            for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry : assignment.getSlotToExecutors().entrySet()) {
+                if (nodeId.equals(entry.getKey().getNodeId())) {
+                    executorsOnNode.addAll(entry.getValue());
+                }
             }
         }
         if (extra != null) {
             executorsOnNode.add(extra);
         }
         //Now check for overlap on the node
+        double memorySharedWithinNode = 0.0;
         for (SharedMemory shared : td.getSharedMemoryRequests(executorsOnNode)) {
             memorySharedWithinNode += shared.get_off_heap_node();
         }
@@ -751,8 +747,9 @@
                 assertValidTopologyForModification(assignment.getTopologyId());
                 assignment.unassignBySlot(slot);
                 String nodeId = slot.getNodeId();
+                TopologyDetails td = topologies.getById(assignment.getTopologyId());
                 assignment.setTotalSharedOffHeapNodeMemory(
-                    nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment));
+                    nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment, td));
                 nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(slot, new NormalizedResourceRequest());
                 nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).remove(slot);
             }
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index b10ad91..31d8ecb 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -123,22 +123,22 @@
         double memoryOnHeap = 10;
         double memoryOffHeap = 10;
         double sharedOnHeapWithinWorker = 450;
-        double sharedOffHeapNode = 600;
+        double sharedOffHeapWithinNode = 600;
         double sharedOffHeapWithinWorker = 400;
 
         TopologyBuilder builder = new TopologyBuilder();
         switch (memoryType) {
             case SHARED_OFF_HEAP_NODE:
                 builder.setSpout("spout", new TestSpout(), spoutParallelism)
-                        .addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "spout shared off heap node"));
+                        .addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "spout shared off heap within node"));
                 break;
             case SHARED_OFF_HEAP_WORKER:
                 builder.setSpout("spout", new TestSpout(), spoutParallelism)
-                        .addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "spout shared off heap worker"));
+                        .addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "spout shared off heap within worker"));
                 break;
             case SHARED_ON_HEAP_WORKER:
                 builder.setSpout("spout", new TestSpout(), spoutParallelism)
-                        .addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "spout shared on heap worker"));
+                        .addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "spout shared on heap within worker"));
                 break;
         }
         StormTopology stormToplogy = builder.createTopology();
@@ -168,9 +168,9 @@
             case SHARED_OFF_HEAP_NODE:
                 // 4 workers on single node. OffHeapNode memory is shared
                 assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
-                assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap + sharedOffHeapNode, 0.01));
+                assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap + sharedOffHeapWithinNode, 0.01));
                 assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(0, 0.01));
-                assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(sharedOffHeapNode, 0.01));
+                assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(sharedOffHeapWithinNode, 0.01));
                 assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(spoutParallelism * memoryOnHeap, 0.01));
                 assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(spoutParallelism * memoryOffHeap, 0.01));
                 assertThat(numNodes, is(1L));
@@ -201,6 +201,75 @@
         }
     }
 
+    /*
+     * test scheduling does not cause negative resources
+     */
+    @Test
+    public void testSchedulingNegativeResources() {
+        int spoutParallelism = 2;
+        int boltParallelism = 2;
+        double cpuPercent = 10;
+        double memoryOnHeap = 10;
+        double memoryOffHeap = 10;
+        double sharedOnHeapWithinWorker = 400;
+        double sharedOffHeapWithinNode = 700;
+        double sharedOffHeapWithinWorker = 500;
+
+        Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
+        TopologyDetails[] topo = new TopologyDetails[2];
+
+        // 1st topology
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new TestSpout(),
+                spoutParallelism);
+        builder.setBolt("bolt-1", new TestBolt(),
+                boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "bolt-1 shared off heap within worker")).shuffleGrouping("spout");
+        builder.setBolt("bolt-2", new TestBolt(),
+                boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "bolt-2 shared off heap within node")).shuffleGrouping("bolt-1");
+        builder.setBolt("bolt-3", new TestBolt(),
+                boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "bolt-3 shared on heap within worker")).shuffleGrouping("bolt-2");
+        StormTopology stormToplogy = builder.createTopology();
+
+        conf.put(Config.TOPOLOGY_PRIORITY, 1);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology-0");
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+        topo[0] = new TopologyDetails("testTopology-id-0", conf, stormToplogy, 0,
+                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+        // 2nd topology
+        builder = new TopologyBuilder();
+        builder.setSpout("spout", new TestSpout(),
+                spoutParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "spout shared off heap within node"));
+        stormToplogy = builder.createTopology();
+
+        conf.put(Config.TOPOLOGY_PRIORITY, 0);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology-1");
+        topo[1] = new TopologyDetails("testTopology-id-1", conf, stormToplogy, 0,
+                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+        Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 500, 2000);
+        Topologies topologies = new Topologies(topo[0]);
+        Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+        // schedule 1st topology
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(conf);
+        scheduler.schedule(topologies, cluster);
+        assertTopologiesFullyScheduled(cluster, topo[0].getName());
+
+        // attempt scheduling both topologies.
+        // this triggered negative resource event as the second topology incorrectly scheduled with the first in place
+        // first topology should get evicted for higher priority (lower value) second topology to successfully schedule
+        topologies = new Topologies(topo[0], topo[1]);
+        cluster = new Cluster(cluster, topologies);
+        scheduler.schedule(topologies, cluster);
+        assertTopologiesNotScheduled(cluster, topo[0].getName());
+        assertTopologiesFullyScheduled(cluster, topo[1].getName());
+
+        // check negative resource count
+        assertThat(cluster.getResourceMetrics().getNegativeResourceEventsMeter().getCount(), is(0L));
+    }
+
     /**
      * test if the scheduling shared memory is correct with/without oneExecutorPerWorker enabled
      */
@@ -213,19 +282,19 @@
         double cpuPercent = 10;
         double memoryOnHeap = 10;
         double memoryOffHeap = 10;
-        double sharedOnHeap = 400;
-        double sharedOffHeapNode = 700;
-        double sharedOffHeapWorker = 600;
+        double sharedOnHeapWithinWorker = 400;
+        double sharedOffHeapWithinNode = 700;
+        double sharedOffHeapWithinWorker = 600;
 
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("spout", new TestSpout(),
                 spoutParallelism);
         builder.setBolt("bolt-1", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWorker, "bolt-1 shared off heap worker")).shuffleGrouping("spout");
+                boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "bolt-1 shared off heap within worker")).shuffleGrouping("spout");
         builder.setBolt("bolt-2", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "bolt-2 shared node")).shuffleGrouping("bolt-1");
+                boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "bolt-2 shared off heap within node")).shuffleGrouping("bolt-1");
         builder.setBolt("bolt-3", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeap, "bolt-3 shared worker")).shuffleGrouping("bolt-2");
+                boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "bolt-3 shared on heap within worker")).shuffleGrouping("bolt-2");
 
         StormTopology stormToplogy = builder.createTopology();
 
@@ -269,32 +338,32 @@
             // Everything should fit in a single slot
             int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
             double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
-            double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
-            double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
+            double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
+            double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker;
 
             SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
             assertThat(assignment.getSlots().size(), is(1));
             WorkerSlot ws = assignment.getSlots().iterator().next();
             String nodeId = ws.getNodeId();
             assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().size(), is(1));
-            assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId), closeTo(sharedOffHeapNode, 0.01));
+            assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId), closeTo(sharedOffHeapWithinNode, 0.01));
             assertThat(assignment.getScheduledResources().size(), is(1));
             WorkerResources resources = assignment.getScheduledResources().get(ws);
             assertThat(resources.get_cpu(), closeTo(totalExpectedCPU, 0.01));
             assertThat(resources.get_mem_on_heap(), closeTo(totalExpectedOnHeap, 0.01));
             assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap, 0.01));
-            assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeap, 0.01));
-            assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWorker, 0.01));
+            assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeapWithinWorker, 0.01));
+            assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWithinWorker, 0.01));
         } else {
             // one worker per executor
             int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
             TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
 
             // get expected mem on topology rather than per executor
-            double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 2 * sharedOnHeap;
-            double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 2 * sharedOffHeapWorker + 2 * sharedOffHeapNode;
-            double expectedMemSharedOnHeap = 2 * sharedOnHeap;
-            double expectedMemSharedOffHeap = 2 * sharedOffHeapWorker + 2 * sharedOffHeapNode;
+            double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 2 * sharedOnHeapWithinWorker;
+            double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 2 * sharedOffHeapWithinWorker + 2 * sharedOffHeapWithinNode;
+            double expectedMemSharedOnHeap = 2 * sharedOnHeapWithinWorker;
+            double expectedMemSharedOffHeap = 2 * sharedOffHeapWithinWorker + 2 * sharedOffHeapWithinNode;
             double expectedMemNonSharedOnHeap = totalNumberOfTasks * memoryOnHeap;
             double expectedMemNonSharedOffHeap = totalNumberOfTasks * memoryOffHeap;
             assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(expectedMemOnHeap, 0.01));