Merge branch 'STORM-357' of https://github.com/d2r/incubator-storm into STORM-357
STORM-357: Cleans workers-users file only when rmr is successful
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
index 2bc2cee..1c601ca 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
@@ -163,8 +163,9 @@
* @param ws the slot to free
* @param cluster the cluster to update
*/
- public void free(WorkerSlot ws, Cluster cluster) {
+ public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
if (_freeSlots.contains(ws)) return;
+ boolean wasFound = false;
for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
Set<WorkerSlot> slots = entry.getValue();
if (slots.remove(ws)) {
@@ -172,11 +173,21 @@
if (_isAlive) {
_freeSlots.add(ws);
}
- return;
+ wasFound = true;
}
}
- throw new IllegalArgumentException("Tried to free a slot that was not" +
- " part of this node " + _nodeId);
+ if(!wasFound)
+ {
+ if(forceFree)
+ {
+ LOG.info("Forcefully freeing the " + ws);
+ cluster.freeSlot(ws);
+ _freeSlots.add(ws);
+ } else {
+ throw new IllegalArgumentException("Tried to free a slot that was not" +
+ " part of this node " + _nodeId);
+ }
+ }
}
/**
@@ -301,7 +312,7 @@
}
if (node.assignInternal(ws, topId, true)) {
LOG.warn("Bad scheduling state, "+ws+" assigned multiple workers, unassigning everything...");
- node.free(ws, cluster);
+ node.free(ws, cluster, true);
}
}
}
diff --git a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
index 4e79240..b3cdb13 100644
--- a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
+++ b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj
@@ -679,6 +679,41 @@
(is (= "Scheduled Isolated on 5 Nodes" (.get (.getStatusMap cluster) "topology3")))
))
+(deftest test-force-free-slot-in-bad-state
+ (let [supers (gen-supervisors 1)
+ topology1 (TopologyDetails. "topology1"
+ {TOPOLOGY-NAME "topology-name-1"
+ TOPOLOGY-SUBMITTER-USER "userC"}
+ (StormTopology.)
+ 4
+ (mk-ed-map [["spout1" 0 5]
+ ["bolt1" 5 10]
+ ["bolt2" 10 15]
+ ["bolt3" 15 20]]))
+ existing-assignments {
+ "topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1)
+ (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20)
+ (ExecutorDetails. 10 15) (WorkerSlot. "super0" 1)
+ (ExecutorDetails. 15 20) (WorkerSlot. "super0" 1)})
+ }
+ cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments)
+ node-map (Node/getAllNodesFrom cluster)
+ topologies (Topologies. (to-top-map [topology1]))
+ conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 5 "userB" 5}}
+ scheduler (MultitenantScheduler.)]
+ (.assign (.get node-map "super0") "topology1" (list (ed 1)) cluster)
+ (.prepare scheduler conf)
+ (.schedule scheduler topologies cluster)
+ (let [assignment (.getAssignmentById cluster "topology1")
+ assigned-slots (.getSlots assignment)
+ executors (.getExecutors assignment)]
+ (log-message "Executors are:" executors)
+ ;; 4 slots on 1 machine, all executors assigned
+ (is (= 4 (.size assigned-slots)))
+ (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
+ )
+ (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+ ))
(deftest test-multitenant-scheduler-bad-starting-state
(let [supers (gen-supervisors 10)