Continuation of Myriad 171, started to work towards public methods incrementResources and decrementResources, as it's easier to reason about purely additve functions in multithreaded environments.  Fixed minor bugs in previous Myriad-171 patch, placed the guard for Node Managers having negative resources in setNodeCapacity and reverted back any from calling yarnScheduler.updateNode directly.  Very well tested.

Todo: Figure out how to make setNodeCapacity private.
JIRA:
  [Myriad-171] https://issues.apache.org/jira/browse/MYRIAD-171
Pull Request:
  Closes #70
Author:
  DarinJ <darinj@apache.org>
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 1dee5fa..e922fc6 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -24,6 +24,8 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.inject.Inject;
 
 import org.apache.hadoop.yarn.api.records.Container;
@@ -77,6 +79,7 @@
   private final OfferLifecycleManager offerLifecycleMgr;
   private final NodeStore nodeStore;
   private final SchedulerState state;
+  private static final Lock yarnSchedulerLock = new ReentrantLock();
   private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
   private TaskUtils taskUtils;
 
@@ -123,7 +126,7 @@
     }
   }
 
-  private synchronized void removeYarnTask(RMContainer rmContainer) {
+  private void removeYarnTask(RMContainer rmContainer) {
     if (rmContainer != null && rmContainer.getContainer() != null) {
       Protos.TaskID taskId = containerToTaskId(rmContainer);
       //TODO (darinj) Reliable messaging
@@ -134,8 +137,7 @@
       if (node != null) {
         RMNode rmNode = node.getNode().getRMNode();
         Resource resource = rmContainer.getContainer().getResource();
-        Resource diff = ResourceUtils.componentwiseMax(ZERO_RESOURCE, Resources.subtract(rmNode.getTotalCapability(), resource));
-        setNodeCapacity(rmNode, diff);
+        decrementNodeCapacity(rmNode, resource);
         LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and {} mem.", rmContainer.getContainer().toString(),
             rmContainer.getContainerExitStatus(), resource.getVirtualCores(), resource.getMemory());
       } else {
@@ -206,8 +208,7 @@
       for (Protos.Offer offer : consumedOffer.getOffers()) {
         offerLifecycleMgr.declineOffer(offer);
       }
-      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), OfferUtils.getYarnResourcesFromMesosOffers(
-          consumedOffer.getOffers())));
+      decrementNodeCapacity(rmNode, OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()));
     } else {
       LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size());
 
@@ -223,8 +224,7 @@
       // Reduce node capacity to account for unused offers
       Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers());
       Resource resUnused = Resources.subtract(resOffered, resUsed);
-      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), resUnused));
-
+      decrementNodeCapacity(rmNode, resUnused);
       myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks);
     }
 
@@ -232,6 +232,15 @@
     node.removeContainerSnapshot();
   }
 
+
+  public void incrementNodeCapacity(RMNode rmNode, Resource addedCapacity) {
+    setNodeCapacity(rmNode, Resources.add(rmNode.getTotalCapability(), addedCapacity));
+  }
+
+  public void decrementNodeCapacity(RMNode rmNode, Resource removedCapacity) {
+    setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), removedCapacity));
+  }
+
   /**
    * 1. Updates {@link RMNode#getTotalCapability()} with newCapacity.
    * 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by YARN's scheduler.
@@ -243,19 +252,34 @@
   @SuppressWarnings("unchecked")
   public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
     //NOOP prevent YARN warning changing to same size
-    if (!Resources.equals(rmNode.getTotalCapability(), newCapacity)) {
-      rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
-      rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
-      LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity);
-      // updates the scheduler with the new capacity for the NM.
-      synchronized (yarnScheduler) {
-        if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) != null) {
-          yarnScheduler.updateNodeResource(rmNode,
-              ResourceOption.newInstance(rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
-        } else {
-          LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", rmNode.getNodeID());
+    if ((Resources.equals(rmNode.getTotalCapability(), newCapacity))) {
+      return;
+    }
+    if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) == null) {
+      LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", rmNode.getNodeID());
+      return;
+    }
+    yarnSchedulerLock.lock();
+    try {
+      if (newCapacity.getMemory() < 0 || newCapacity.getVirtualCores() < 0) {
+        Resource zeroed = ResourceUtils.componentwiseMax(ZERO_RESOURCE, newCapacity);
+        rmNode.getTotalCapability().setMemory(zeroed.getMemory());
+        rmNode.getTotalCapability().setVirtualCores(zeroed.getVirtualCores());
+        LOGGER.warn("Asked to set Node {} to a value less than zero!  Had {}, setting to {}.",
+            rmNode.getHttpAddress(), rmNode.getTotalCapability().toString(), zeroed.toString());
+      } else {
+        rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
+        rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity);
         }
       }
+      // updates the scheduler with the new capacity for the NM.
+      // the event is handled by the scheduler asynchronously
+      rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(
+          rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+    } finally {
+      yarnSchedulerLock.unlock();
     }
   }
 
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
index 5d59c68..f7d8c43 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
@@ -117,7 +117,7 @@
         then:
         zeroNM.getTotalCapability().getMemory() == 2048
         zeroNM.getTotalCapability().getVirtualCores() == 2
-        1 * yarnScheduler.updateNodeResource( _ as RMNode, _ as ResourceOption)
+        1 * rmContext.getDispatcher().getEventHandler().handle(_ as NodeResourceUpdateSchedulerEvent)
     }
 
     YarnNodeCapacityManager getYarnNodeCapacityManager() {