Merge branch 'STORM-357' of https://github.com/d2r/incubator-storm into STORM-357

STORM-357: Cleans workers-users file only when rmr is successful
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
index 2bc2cee..1c601ca 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
@@ -163,8 +163,9 @@
    * @param ws the slot to free
    * @param cluster the cluster to update
    */
-  public void free(WorkerSlot ws, Cluster cluster) {
+  public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
     if (_freeSlots.contains(ws)) return;
+    boolean wasFound = false;
     for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
       Set<WorkerSlot> slots = entry.getValue();
       if (slots.remove(ws)) {
@@ -172,11 +173,21 @@
         if (_isAlive) {
           _freeSlots.add(ws);
         }
-        return;
+        wasFound = true;
       }
     }
-    throw new IllegalArgumentException("Tried to free a slot that was not" +
-    		" part of this node " + _nodeId);
+    if(!wasFound)
+    {
+      if(forceFree)
+      {
+        LOG.info("Forcefully freeing the " + ws);
+        cluster.freeSlot(ws);
+        _freeSlots.add(ws);
+      } else {
+        throw new IllegalArgumentException("Tried to free a slot that was not" +
+              " part of this node " + _nodeId);
+      }
+    }
   }
    
   /**
@@ -301,7 +312,7 @@
         }
         if (node.assignInternal(ws, topId, true)) {
           LOG.warn("Bad scheduling state, "+ws+" assigned multiple workers, unassigning everything...");
-          node.free(ws, cluster);
+          node.free(ws, cluster, true);
         }
       }
     }
diff --git a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
index 4e79240..b3cdb13 100644
--- a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
+++ b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
@@ -679,6 +679,41 @@
     (is (= "Scheduled Isolated on 5 Nodes" (.get (.getStatusMap cluster) "topology3")))
 ))
 
+(deftest test-force-free-slot-in-bad-state
+  (let [supers (gen-supervisors 1)
+        topology1 (TopologyDetails. "topology1"
+                                    {TOPOLOGY-NAME "topology-name-1"
+                                     TOPOLOGY-SUBMITTER-USER "userC"}
+                                    (StormTopology.)
+                                    4
+                                    (mk-ed-map [["spout1" 0 5]
+                                                ["bolt1" 5 10]
+                                                ["bolt2" 10 15]
+                                                ["bolt3" 15 20]]))
+        existing-assignments {
+                               "topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1)
+                                                                                  (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20)
+                                                                                  (ExecutorDetails. 10 15) (WorkerSlot. "super0" 1)
+                                                                                  (ExecutorDetails. 15 20) (WorkerSlot. "super0" 1)})
+                               }
+        cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments)
+        node-map (Node/getAllNodesFrom cluster)
+        topologies (Topologies. (to-top-map [topology1]))
+        conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
+        scheduler (MultitenantScheduler.)]
+    (.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
+    (.prepare scheduler conf)
+    (.schedule scheduler topologies cluster)
+    (let [assignment (.getAssignmentById cluster "topology1")
+          assigned-slots (.getSlots assignment)
+          executors (.getExecutors assignment)]
+      (log-message "Executors are:" executors)
+      ;; 4 slots on 1 machine, all executors assigned
+      (is (= 4 (.size assigned-slots)))
+      (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+      )
+    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+    ))
 
 (deftest test-multitenant-scheduler-bad-starting-state
   (let [supers (gen-supervisors 10)