blob: 8ab844ddd88a55e89a84cd9663f25f356894df3d [file] [log] [blame]
(ns backtype.storm.nimbus-test
(:use [clojure test])
(:use [clojure.contrib.def :only [defnk]])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
)
(bootstrap)
(defn storm-component-info [state storm-name]
(let [storm-id (get-storm-id state storm-name)]
(reverse-map (storm-task-info state storm-id))))
(defn storm-num-workers [state storm-name]
(let [storm-id (get-storm-id state storm-name)
assignment (.assignment-info state storm-id nil)]
(count (reverse-map (:task->node+port assignment)))
))
(defn do-task-heartbeat [cluster storm-id task-id]
(let [state (:storm-cluster-state cluster)]
(.task-heartbeat! state storm-id task-id (TaskHeartbeat. (current-time-secs) 10 {}))
))
(defn task-assignment [cluster storm-id task-id]
(let [state (:storm-cluster-state cluster)
assignment (.assignment-info state storm-id nil)]
((:task->node+port assignment) task-id)
))
(defn slot-assignments [cluster storm-id]
(let [state (:storm-cluster-state cluster)
assignment (.assignment-info state storm-id nil)]
(reverse-map (:task->node+port assignment))
))
(defn task-start-times [cluster storm-id]
(let [state (:storm-cluster-state cluster)
assignment (.assignment-info state storm-id nil)]
(:task->start-time-secs assignment)
))
(defnk check-consistency [cluster storm-name :assigned? true]
(let [state (:storm-cluster-state cluster)
storm-id (get-storm-id state storm-name)
task-ids (.task-ids state storm-id)
assignment (.assignment-info state storm-id nil)
task->node+port (:task->node+port assignment)
all-nodes (set (map first (vals task->node+port)))]
(when assigned?
(is (= (set task-ids) (set (keys task->node+port)))))
(doseq [[t s] task->node+port]
(is (not-nil? s)))
(is (= all-nodes (set (keys (:node->host assignment)))))
(doseq [[t s] task->node+port]
(is (not-nil? ((:task->start-time-secs assignment) t))))
))
(deftest test-assignment
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKERS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3)}
{2 (thrift/mk-bolt-spec {1 :none} (TestPlannerBolt.) :parallelism-hint 4)
3 (thrift/mk-bolt-spec {2 :none} (TestPlannerBolt.))})
topology2 (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 12)}
{2 (thrift/mk-bolt-spec {1 :none} (TestPlannerBolt.) :parallelism-hint 6)
3 (thrift/mk-bolt-spec {1 :global} (TestPlannerBolt.) :parallelism-hint 8)
4 (thrift/mk-bolt-spec {1 :global 2 :none} (TestPlannerBolt.) :parallelism-hint 4)}
)
_ (submit-local-topology nimbus "mystorm" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 4} topology)
task-info (storm-component-info state "mystorm")]
(check-consistency cluster "mystorm")
;; 3 should be assigned once (if it were optimized, we'd have
;; different topology)
(is (= 1 (count (.assignments state nil))))
(is (= 1 (count (task-info 1))))
(is (= 4 (count (task-info 2))))
(is (= 1 (count (task-info 3))))
(is (= 4 (storm-num-workers state "mystorm")))
(submit-local-topology nimbus "storm2" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20} topology2)
(check-consistency cluster "storm2")
(is (= 2 (count (.assignments state nil))))
(let [task-info (storm-component-info state "storm2")]
(is (= 12 (count (task-info 1))))
(is (= 6 (count (task-info 2))))
(is (= 1 (count (task-info 3))))
(is (= 4 (count (task-info 4))))
(is (= 8 (storm-num-workers state "storm2")))
)
)))
(deftest test-over-parallelism-assignment
(with-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKERS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 21)}
{2 (thrift/mk-bolt-spec {1 :none} (TestPlannerBolt.) :parallelism-hint 9)
3 (thrift/mk-bolt-spec {1 :none} (TestPlannerBolt.) :parallelism-hint 2)
4 (thrift/mk-bolt-spec {1 :none} (TestPlannerBolt.) :parallelism-hint 10)}
)
_ (submit-local-topology nimbus "test" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 7} topology)
task-info (storm-component-info state "test")]
(check-consistency cluster "test")
(is (= 21 (count (task-info 1))))
(is (= 9 (count (task-info 2))))
(is (= 2 (count (task-info 3))))
(is (= 10 (count (task-info 4))))
(is (= 7 (storm-num-workers state "test")))
)))
(deftest test-kill-storm
(with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-TASK-TIMEOUT-SECS 30
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKERS 0}]
(letlocals
(bind conf (:daemon-conf cluster))
(bind topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 14)}
{}
))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
(bind storm-id (get-storm-id state "test"))
(advance-cluster-time cluster 5)
(is (not-nil? (.storm-base state storm-id nil)))
(is (not-nil? (.assignment-info state storm-id nil)))
(.killTopology (:nimbus cluster) "test")
;; check that storm is deactivated but alive
(is (nil? (.storm-base state storm-id nil)))
(is (not-nil? (.assignment-info state storm-id nil)))
(advance-cluster-time cluster 18)
;; check that storm is deactivated but alive
(is (= 1 (count (.task-storms state))))
(is (= 1 (count (.heartbeat-storms state))))
(advance-cluster-time cluster 3)
(is (nil? (.storm-base state storm-id nil)))
(is (nil? (.assignment-info state storm-id nil)))
(is (empty? (.task-storms state)))
(is (empty? (.heartbeat-storms state)))
;; TODO: check that code on nimbus was cleaned up locally...
(is (thrown? NotAliveException (.killTopology (:nimbus cluster) "lalala")))
(submit-local-topology (:nimbus cluster) "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology)
(is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
(bind storm-id (get-storm-id state "2test"))
(is (not-nil? (.storm-base state storm-id nil)))
(.killTopology (:nimbus cluster) "2test")
(submit-local-topology (:nimbus cluster) "2test" {} topology)
(bind storm-id2 (get-storm-id state "2test"))
(is (not= storm-id storm-id2))
(is (not-nil? (.storm-base state storm-id2 nil)))
(is (nil? (.storm-base state storm-id nil)))
(is (not-nil? (.assignment-info state storm-id nil)))
(is (= 2 (count (.task-storms state))))
(is (= 2 (count (.heartbeat-storms state))))
(advance-cluster-time cluster 11)
(is (nil? (.assignment-info state storm-id nil)))
(is (not-nil? (.storm-base state storm-id2 nil)))
(is (not-nil? (.assignment-info state storm-id2 nil)))
(is (= 1 (count (.task-storms state))))
(is (= 1 (count (.heartbeat-storms state))))
(submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (get-storm-id state "test3"))
(advance-cluster-time cluster 1)
(.deactivate-storm! state storm-id3)
(is (nil? (.storm-base state storm-id3 nil)))
(is (not-nil? (.assignment-info state storm-id3 nil)))
(is (= 2 (count (.task-storms state))))
(is (= 2 (count (.heartbeat-storms state))))
(advance-cluster-time cluster (+ 5 1 (conf NIMBUS-MONITOR-FREQ-SECS)))
(is (nil? (.assignment-info state storm-id3 nil)))
(is (= 1 (count (.task-storms state))))
(is (= 1 (count (.heartbeat-storms state))))
;; test that it doesn't clean up heartbeats until all tasks have timed out
(submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (get-storm-id state "test3"))
(bind task-id (first (.task-ids state storm-id3)))
(do-task-heartbeat cluster storm-id task-id)
(.killTopology (:nimbus cluster) "test3")
(advance-cluster-time cluster 6)
(is (= 1 (count (.task-storms state))))
(is (= 2 (count (.heartbeat-storms state))))
(advance-cluster-time cluster 10)
(is (= 2 (count (.heartbeat-storms state))))
(advance-cluster-time cluster 30)
(is (= 1 (count (.heartbeat-storms state))))
)))
(deftest test-reassignment
(with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
TOPOLOGY-ACKERS 0}]
(letlocals
(bind conf (:daemon-conf cluster))
(bind topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
{}
))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
(check-consistency cluster "test")
(bind storm-id (get-storm-id state "test"))
(bind [task-id1 task-id2] (.task-ids state storm-id))
(bind ass1 (task-assignment cluster storm-id task-id1))
(bind ass2 (task-assignment cluster storm-id task-id2))
(advance-cluster-time cluster 59)
(do-task-heartbeat cluster storm-id task-id1)
(do-task-heartbeat cluster storm-id task-id2)
(advance-cluster-time cluster 13)
(is (= ass1 (task-assignment cluster storm-id task-id1)))
(is (= ass2 (task-assignment cluster storm-id task-id2)))
(do-task-heartbeat cluster storm-id task-id1)
(advance-cluster-time cluster 11)
(do-task-heartbeat cluster storm-id task-id1)
(is (= ass1 (task-assignment cluster storm-id task-id1)))
(check-consistency cluster "test")
; have to wait an extra 10 seconds because nimbus may not
; resynchronize its heartbeat time till monitor-time secs after
(advance-cluster-time cluster 11)
(do-task-heartbeat cluster storm-id task-id1)
(is (= ass1 (task-assignment cluster storm-id task-id1)))
(check-consistency cluster "test")
(advance-cluster-time cluster 11)
(is (= ass1 (task-assignment cluster storm-id task-id1)))
(is (not= ass2 (task-assignment cluster storm-id task-id2)))
(bind ass2 (task-assignment cluster storm-id task-id2))
(check-consistency cluster "test")
(advance-cluster-time cluster 31)
(is (not= ass1 (task-assignment cluster storm-id task-id1)))
(is (= ass2 (task-assignment cluster storm-id task-id2))) ; tests launch timeout
(check-consistency cluster "test")
(bind ass1 (task-assignment cluster storm-id task-id1))
(bind active-supervisor (first ass2))
(kill-supervisor cluster active-supervisor)
(doseq [i (range 12)]
(do-task-heartbeat cluster storm-id task-id1)
(do-task-heartbeat cluster storm-id task-id2)
(advance-cluster-time cluster 10)
)
;; tests that it doesn't reassign tasks if they're heartbeating even if supervisor times out
(is (= ass1 (task-assignment cluster storm-id task-id1)))
(is (= ass2 (task-assignment cluster storm-id task-id2)))
(check-consistency cluster "test")
(advance-cluster-time cluster 30)
(bind ass1 (task-assignment cluster storm-id task-id1))
(bind ass2 (task-assignment cluster storm-id task-id2))
(is (not-nil? ass1))
(is (not-nil? ass2))
(is (not= active-supervisor (first (task-assignment cluster storm-id task-id2))))
(is (not= active-supervisor (first (task-assignment cluster storm-id task-id1))))
(check-consistency cluster "test")
(doseq [supervisor-id (.supervisors state nil)]
(kill-supervisor cluster supervisor-id))
(advance-cluster-time cluster 90)
(bind ass1 (task-assignment cluster storm-id task-id1))
(bind ass2 (task-assignment cluster storm-id task-id2))
(is (nil? ass1))
(is (nil? ass2))
(check-consistency cluster "test" :assigned? false)
(add-supervisor cluster)
(advance-cluster-time cluster 11)
(check-consistency cluster "test")
)))
(defn check-distribution [slot-tasks distribution]
(let [dist (multi-set (map count (vals slot-tasks)))]
(is (= dist (multi-set distribution)))
))
(deftest test-reassign-squeezed-topology
(with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 1
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKERS 0}]
(letlocals
(bind topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 9)}
{}))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4} topology) ; distribution should be 2, 2, 2, 3 ideally
(bind storm-id (get-storm-id state "test"))
(bind slot-tasks (slot-assignments cluster storm-id))
(check-distribution (slot-assignments cluster storm-id) [9])
(check-consistency cluster "test")
(add-supervisor cluster :ports 2)
(advance-cluster-time cluster 11)
(bind slot-tasks (slot-assignments cluster storm-id))
(bind task->start (task-start-times cluster storm-id))
(check-distribution slot-tasks [3 3 3])
(check-consistency cluster "test")
(add-supervisor cluster :ports 8)
;; this actually works for any time > 0, since zookeeper fires an event causing immediate reassignment
;; doesn't work for time = 0 because it's not waiting for cluster yet, so test might happen before reassignment finishes
(advance-cluster-time cluster 11)
(bind slot-tasks2 (slot-assignments cluster storm-id))
(bind task->start2 (task-start-times cluster storm-id))
(check-distribution slot-tasks2 [2 2 2 3])
(check-consistency cluster "test")
(bind common (first (find-first (fn [[k v]] (= 3 (count v))) slot-tasks2)))
(is (not-nil? common))
(is (= (slot-tasks2 common) (slot-tasks common)))
;; check that start times are changed for everything but the common one
(bind same-tasks (slot-tasks2 common))
(bind changed-tasks (apply concat (vals (dissoc slot-tasks2 common))))
(doseq [t same-tasks]
(is (= (task->start t) (task->start2 t))))
(doseq [t changed-tasks]
(is (not= (task->start t) (task->start2 t))))
)))
(deftest test-no-overlapping-slots
;; test that same node+port never appears across 2 assignments
)
(deftest test-stateless
;; test that nimbus can die and restart without any problems
)