blob: 0645f392334cb577be4d73765d8dc85bb8a337ce [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.nimbus-test
(:use [clojure test])
(:require [backtype.storm [util :as util] [stats :as stats]])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
(:require [backtype.storm [converter :as converter]])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
TestAggregatesCounter TestPlannerSpout TestPlannerBolt])
(:import [backtype.storm.scheduler INimbus])
(:import [backtype.storm.nimbus ILeaderElector NimbusInfo])
(:import [backtype.storm.generated Credentials NotAliveException SubmitOptions
TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions
InvalidTopologyException AuthorizationException])
(:import [java.util HashMap])
(:import [java.io File])
(:import [backtype.storm.utils Time])
(:import [org.apache.commons.io FileUtils])
(:use [backtype.storm testing MockAutoCred util config log timer zookeeper])
(:use [backtype.storm.daemon common])
(:require [conjure.core])
(:require [backtype.storm
[thrift :as thrift]
[cluster :as cluster]])
(:use [conjure core]))
(defn storm-component->task-info [cluster storm-name]
(let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)
nimbus (:nimbus cluster)]
(-> (.getUserTopology nimbus storm-id)
(storm-task-info (from-json (.getTopologyConf nimbus storm-id)))
reverse-map)))
(defn getCredentials [cluster storm-name]
(let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)]
(.credentials (:storm-cluster-state cluster) storm-id nil)))
(defn storm-component->executor-info [cluster storm-name]
(let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)
nimbus (:nimbus cluster)
storm-conf (from-json (.getTopologyConf nimbus storm-id))
topology (.getUserTopology nimbus storm-id)
task->component (storm-task-info topology storm-conf)
state (:storm-cluster-state cluster)
get-component (comp task->component first)]
(->> (.assignment-info state storm-id nil)
:executor->node+port
keys
(map (fn [e] {e (get-component e)}))
(apply merge)
reverse-map)))
(defn storm-num-workers [state storm-name]
(let [storm-id (get-storm-id state storm-name)
assignment (.assignment-info state storm-id nil)]
(count (reverse-map (:executor->node+port assignment)))
))
(defn topology-nodes [state storm-name]
(let [storm-id (get-storm-id state storm-name)
assignment (.assignment-info state storm-id nil)]
(->> assignment
:executor->node+port
vals
(map first)
set
)))
(defn topology-slots [state storm-name]
(let [storm-id (get-storm-id state storm-name)
assignment (.assignment-info state storm-id nil)]
(->> assignment
:executor->node+port
vals
set
)))
(defn topology-node-distribution [state storm-name]
(let [storm-id (get-storm-id state storm-name)
assignment (.assignment-info state storm-id nil)]
(->> assignment
:executor->node+port
vals
set
(group-by first)
(map-val count)
(map (fn [[_ amt]] {amt 1}))
(apply merge-with +)
)))
(defn topology-num-nodes [state storm-name]
(count (topology-nodes state storm-name)))
(defn executor-assignment [cluster storm-id executor-id]
(let [state (:storm-cluster-state cluster)
assignment (.assignment-info state storm-id nil)]
((:executor->node+port assignment) executor-id)
))
(defn executor-start-times [cluster storm-id]
(let [state (:storm-cluster-state cluster)
assignment (.assignment-info state storm-id nil)]
(:executor->start-time-secs assignment)))
(defn do-executor-heartbeat [cluster storm-id executor]
(let [state (:storm-cluster-state cluster)
executor->node+port (:executor->node+port (.assignment-info state storm-id nil))
[node port] (get executor->node+port executor)
curr-beat (.get-worker-heartbeat state storm-id node port)
stats (:executor-stats curr-beat)]
(.worker-heartbeat! state storm-id node port
{:storm-id storm-id :time-secs (current-time-secs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})}
)))
(defn slot-assignments [cluster storm-id]
(let [state (:storm-cluster-state cluster)
assignment (.assignment-info state storm-id nil)]
(reverse-map (:executor->node+port assignment))
))
(defn task-ids [cluster storm-id]
(let [nimbus (:nimbus cluster)]
(-> (.getUserTopology nimbus storm-id)
(storm-task-info (from-json (.getTopologyConf nimbus storm-id)))
keys)))
(defn topology-executors [cluster storm-id]
(let [state (:storm-cluster-state cluster)
assignment (.assignment-info state storm-id nil)]
(keys (:executor->node+port assignment))
))
(defn check-distribution [items distribution]
(let [dist (->> items (map count) multi-set)]
(is (= dist (multi-set distribution)))
))
(defn disjoint? [& sets]
(let [combined (apply concat sets)]
(= (count combined) (count (set combined)))
))
(defnk check-consistency [cluster storm-name :assigned? true]
(let [state (:storm-cluster-state cluster)
storm-id (get-storm-id state storm-name)
task-ids (task-ids cluster storm-id)
assignment (.assignment-info state storm-id nil)
executor->node+port (:executor->node+port assignment)
task->node+port (to-task->node+port executor->node+port)
assigned-task-ids (mapcat executor-id->tasks (keys executor->node+port))
all-nodes (set (map first (vals executor->node+port)))]
(when assigned?
(is (= (sort task-ids) (sort assigned-task-ids)))
(doseq [t task-ids]
(is (not-nil? (task->node+port t)))))
(doseq [[e s] executor->node+port]
(is (not-nil? s)))
;;(map str (-> (Thread/currentThread) .getStackTrace))
(is (= all-nodes (set (keys (:node->host assignment)))))
(doseq [[e s] executor->node+port]
(is (not-nil? ((:executor->start-time-secs assignment) e))))
))
(deftest test-bogusId
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
:daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)]
(is (thrown? NotAliveException (.getTopologyConf nimbus "bogus-id")))
(is (thrown? NotAliveException (.getTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getUserTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getTopologyInfo nimbus "bogus-id")))
(is (thrown? NotAliveException (.uploadNewCredentials nimbus "bogus-id" (Credentials.))))
)))
(deftest test-assignment
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
:daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 4)
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))})
topology2 (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 12)}
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 6)
"3" (thrift/mk-bolt-spec {"1" :global} (TestPlannerBolt.) :parallelism-hint 8)
"4" (thrift/mk-bolt-spec {"1" :global "2" :none} (TestPlannerBolt.) :parallelism-hint 4)}
)
_ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
task-info (storm-component->task-info cluster "mystorm")]
(check-consistency cluster "mystorm")
;; 3 should be assigned once (if it were optimized, we'd have
;; different topology)
(is (= 1 (count (.assignments state nil))))
(is (= 1 (count (task-info "1"))))
(is (= 4 (count (task-info "2"))))
(is (= 1 (count (task-info "3"))))
(is (= 4 (storm-num-workers state "mystorm")))
(submit-local-topology nimbus "storm2" {TOPOLOGY-WORKERS 20} topology2)
(check-consistency cluster "storm2")
(is (= 2 (count (.assignments state nil))))
(let [task-info (storm-component->task-info cluster "storm2")]
(is (= 12 (count (task-info "1"))))
(is (= 6 (count (task-info "2"))))
(is (= 8 (count (task-info "3"))))
(is (= 4 (count (task-info "4"))))
(is (= 8 (storm-num-workers state "storm2")))
)
)))
(defn isolation-nimbus []
(let [standalone (nimbus/standalone-nimbus)]
(reify INimbus
(prepare [this conf local-dir]
(.prepare standalone conf local-dir)
)
(allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
(.allSlotsAvailableForScheduling standalone supervisors topologies topologies-missing-assignments))
(assignSlots [this topology slots]
(.assignSlots standalone topology slots)
)
(getForcedScheduler [this]
(.getForcedScheduler standalone))
(getHostName [this supervisors node-id]
node-id
))))
(deftest test-auto-credentials
(with-simulated-time-local-cluster [cluster :supervisors 6
:ports-per-supervisor 3
:daemon-conf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
NIMBUS-CREDENTIAL-RENEW-FREQ-SECS 10
NIMBUS-CREDENTIAL-RENEWERS (list "backtype.storm.MockAutoCred")
NIMBUS-AUTO-CRED-PLUGINS (list "backtype.storm.MockAutoCred")
}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
topology-name "test-auto-cred-storm"
submitOptions (SubmitOptions. TopologyInitialStatus/INACTIVE)
- (.set_creds submitOptions (Credentials. (HashMap.)))
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 4)
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))})
_ (submit-local-topology-with-opts nimbus topology-name {TOPOLOGY-WORKERS 4
TOPOLOGY-AUTO-CREDENTIALS (list "backtype.storm.MockAutoCred")
} topology submitOptions)
credentials (getCredentials cluster topology-name)]
; check that the credentials have nimbus auto generated cred
(is (= (.get credentials nimbus-cred-key) nimbus-cred-val))
;advance cluster time so the renewers can execute
(advance-cluster-time cluster 20)
;check that renewed credentials replace the original credential.
(is (= (.get (getCredentials cluster topology-name) nimbus-cred-key) nimbus-cred-renew-val))
(is (= (.get (getCredentials cluster topology-name) gateway-cred-key) gateway-cred-renew-val)))))
(deftest test-isolated-assignment
(with-simulated-time-local-cluster [cluster :supervisors 6
:ports-per-supervisor 3
:inimbus (isolation-nimbus)
:daemon-conf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
STORM-SCHEDULER "backtype.storm.scheduler.IsolationScheduler"
ISOLATION-SCHEDULER-MACHINES {"tester1" 3 "tester2" 2}
NIMBUS-MONITOR-FREQ-SECS 10
}]
(letlocals
(bind state (:storm-cluster-state cluster))
(bind nimbus (:nimbus cluster))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 5)
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))}))
(submit-local-topology nimbus "noniso" {TOPOLOGY-WORKERS 4} topology)
(advance-cluster-time cluster 1)
(is (= 4 (topology-num-nodes state "noniso")))
(is (= 4 (storm-num-workers state "noniso")))
(submit-local-topology nimbus "tester1" {TOPOLOGY-WORKERS 6} topology)
(submit-local-topology nimbus "tester2" {TOPOLOGY-WORKERS 6} topology)
(advance-cluster-time cluster 1)
(bind task-info-tester1 (storm-component->task-info cluster "tester1"))
(bind task-info-tester2 (storm-component->task-info cluster "tester2"))
(is (= 1 (topology-num-nodes state "noniso")))
(is (= 3 (storm-num-workers state "noniso")))
(is (= {2 3} (topology-node-distribution state "tester1")))
(is (= {3 2} (topology-node-distribution state "tester2")))
(is (apply disjoint? (map (partial topology-nodes state) ["noniso" "tester1" "tester2"])))
(check-consistency cluster "tester1")
(check-consistency cluster "tester2")
(check-consistency cluster "noniso")
;;check that nothing gets reassigned
(bind tester1-slots (topology-slots state "tester1"))
(bind tester2-slots (topology-slots state "tester2"))
(bind noniso-slots (topology-slots state "noniso"))
(advance-cluster-time cluster 20)
(is (= tester1-slots (topology-slots state "tester1")))
(is (= tester2-slots (topology-slots state "tester2")))
(is (= noniso-slots (topology-slots state "noniso")))
)))
(deftest test-zero-executor-or-tasks
(with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3 :conf {TOPOLOGY-TASKS 0})}
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 1 :conf {TOPOLOGY-TASKS 2})
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) :conf {TOPOLOGY-TASKS 5})})
_ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
task-info (storm-component->task-info cluster "mystorm")]
(check-consistency cluster "mystorm")
(is (= 0 (count (task-info "1"))))
(is (= 2 (count (task-info "2"))))
(is (= 5 (count (task-info "3"))))
(is (= 2 (storm-num-workers state "mystorm"))) ;; because only 2 executors
)))
(deftest test-executor-assignments
(with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3 :conf {TOPOLOGY-TASKS 5})}
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 8 :conf {TOPOLOGY-TASKS 2})
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) :parallelism-hint 3)})
_ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
task-info (storm-component->task-info cluster "mystorm")
executor-info (->> (storm-component->executor-info cluster "mystorm")
(map-val #(map executor-id->tasks %)))]
(check-consistency cluster "mystorm")
(is (= 5 (count (task-info "1"))))
(check-distribution (executor-info "1") [2 2 1])
(is (= 2 (count (task-info "2"))))
(check-distribution (executor-info "2") [1 1])
(is (= 3 (count (task-info "3"))))
(check-distribution (executor-info "3") [1 1 1])
)))
(deftest test-over-parallelism-assignment
(with-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
:daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 21)}
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 9)
"3" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 2)
"4" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 10)}
)
_ (submit-local-topology nimbus "test" {TOPOLOGY-WORKERS 7} topology)
task-info (storm-component->task-info cluster "test")]
(check-consistency cluster "test")
(is (= 21 (count (task-info "1"))))
(is (= 9 (count (task-info "2"))))
(is (= 2 (count (task-info "3"))))
(is (= 10 (count (task-info "4"))))
(is (= 7 (storm-num-workers state "test")))
)))
(deftest test-kill-storm
(with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-TASK-TIMEOUT-SECS 30
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(letlocals
(bind conf (:daemon-conf cluster))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 14)}
{}
))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
(bind storm-id (get-storm-id state "test"))
(advance-cluster-time cluster 5)
(is (not-nil? (.storm-base state storm-id nil)))
(is (not-nil? (.assignment-info state storm-id nil)))
(.killTopology (:nimbus cluster) "test")
;; check that storm is deactivated but alive
(is (= :killed (-> (.storm-base state storm-id nil) :status :type)))
(is (not-nil? (.assignment-info state storm-id nil)))
(advance-cluster-time cluster 18)
;; check that storm is deactivated but alive
(is (= 1 (count (.heartbeat-storms state))))
(advance-cluster-time cluster 3)
(is (nil? (.storm-base state storm-id nil)))
(is (nil? (.assignment-info state storm-id nil)))
;; cleanup happens on monitoring thread
(advance-cluster-time cluster 11)
(is (empty? (.heartbeat-storms state)))
;; TODO: check that code on nimbus was cleaned up locally...
(is (thrown? NotAliveException (.killTopology (:nimbus cluster) "lalala")))
(submit-local-topology (:nimbus cluster) "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology)
(is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
(bind storm-id (get-storm-id state "2test"))
(is (not-nil? (.storm-base state storm-id nil)))
(.killTopology (:nimbus cluster) "2test")
(is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
(advance-cluster-time cluster 5)
(is (= 1 (count (.heartbeat-storms state))))
(advance-cluster-time cluster 6)
(is (nil? (.storm-base state storm-id nil)))
(is (nil? (.assignment-info state storm-id nil)))
(advance-cluster-time cluster 11)
(is (= 0 (count (.heartbeat-storms state))))
(submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (get-storm-id state "test3"))
(advance-cluster-time cluster 1)
(.remove-storm! state storm-id3)
(is (nil? (.storm-base state storm-id3 nil)))
(is (nil? (.assignment-info state storm-id3 nil)))
(advance-cluster-time cluster 11)
(is (= 0 (count (.heartbeat-storms state))))
;; this guarantees that monitor thread won't trigger for 10 more seconds
(advance-time-secs! 11)
(wait-until-cluster-waiting cluster)
(submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (get-storm-id state "test3"))
(bind executor-id (first (topology-executors cluster storm-id3)))
(do-executor-heartbeat cluster storm-id3 executor-id)
(.killTopology (:nimbus cluster) "test3")
(advance-cluster-time cluster 6)
(is (= 1 (count (.heartbeat-storms state))))
(advance-cluster-time cluster 5)
(is (= 0 (count (.heartbeat-storms state))))
;; test kill with opts
(submit-local-topology (:nimbus cluster) "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} topology)
(.killTopologyWithOpts (:nimbus cluster) "test4" (doto (KillOptions.) (.set_wait_secs 10)))
(bind storm-id4 (get-storm-id state "test4"))
(advance-cluster-time cluster 9)
(is (not-nil? (.assignment-info state storm-id4 nil)))
(advance-cluster-time cluster 2)
(is (nil? (.assignment-info state storm-id4 nil)))
)))
(deftest test-reassignment
(with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(letlocals
(bind conf (:daemon-conf cluster))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
{}
))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
(check-consistency cluster "test")
(bind storm-id (get-storm-id state "test"))
(bind [executor-id1 executor-id2] (topology-executors cluster storm-id))
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(advance-cluster-time cluster 59)
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)
(advance-cluster-time cluster 13)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
(do-executor-heartbeat cluster storm-id executor-id1)
(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(check-consistency cluster "test")
; have to wait an extra 10 seconds because nimbus may not
; resynchronize its heartbeat time till monitor-time secs after
(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(check-consistency cluster "test")
(advance-cluster-time cluster 11)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (not= ass2 (executor-assignment cluster storm-id executor-id2)))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(check-consistency cluster "test")
(advance-cluster-time cluster 31)
(is (not= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2))) ; tests launch timeout
(check-consistency cluster "test")
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind active-supervisor (first ass2))
(kill-supervisor cluster active-supervisor)
(doseq [i (range 12)]
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)
(advance-cluster-time cluster 10)
)
;; tests that it doesn't reassign executors if they're heartbeating even if supervisor times out
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
(check-consistency cluster "test")
(advance-cluster-time cluster 30)
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(is (not-nil? ass1))
(is (not-nil? ass2))
(is (not= active-supervisor (first (executor-assignment cluster storm-id executor-id2))))
(is (not= active-supervisor (first (executor-assignment cluster storm-id executor-id1))))
(check-consistency cluster "test")
(doseq [supervisor-id (.supervisors state nil)]
(kill-supervisor cluster supervisor-id))
(advance-cluster-time cluster 90)
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(is (nil? ass1))
(is (nil? ass2))
(check-consistency cluster "test" :assigned? false)
(add-supervisor cluster)
(advance-cluster-time cluster 11)
(check-consistency cluster "test")
)))
(deftest test-reassignment-to-constrained-cluster
(with-simulated-time-local-cluster [cluster :supervisors 0
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(letlocals
(add-supervisor cluster :ports 1 :id "a")
(add-supervisor cluster :ports 1 :id "b")
(bind conf (:daemon-conf cluster))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
{}
))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
(check-consistency cluster "test")
(bind storm-id (get-storm-id state "test"))
(bind [executor-id1 executor-id2] (topology-executors cluster storm-id))
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(advance-cluster-time cluster 59)
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)
(advance-cluster-time cluster 13)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
(kill-supervisor cluster "b")
(do-executor-heartbeat cluster storm-id executor-id1)
(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(check-consistency cluster "test")
(is (= 1 (storm-num-workers state "test")))
)))
(defn check-executor-distribution [slot-executors distribution]
(check-distribution (vals slot-executors) distribution))
(defn check-num-nodes [slot-executors num-nodes]
(let [nodes (->> slot-executors keys (map first) set)]
(is (= num-nodes (count nodes)))
))
(deftest test-reassign-squeezed-topology
(with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 1
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(letlocals
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 9)}
{}))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4} topology) ; distribution should be 2, 2, 2, 3 ideally
(bind storm-id (get-storm-id state "test"))
(bind slot-executors (slot-assignments cluster storm-id))
(check-executor-distribution slot-executors [9])
(check-consistency cluster "test")
(add-supervisor cluster :ports 2)
(advance-cluster-time cluster 11)
(bind slot-executors (slot-assignments cluster storm-id))
(bind executor->start (executor-start-times cluster storm-id))
(check-executor-distribution slot-executors [3 3 3])
(check-consistency cluster "test")
(add-supervisor cluster :ports 8)
;; this actually works for any time > 0, since zookeeper fires an event causing immediate reassignment
;; doesn't work for time = 0 because it's not waiting for cluster yet, so test might happen before reassignment finishes
(advance-cluster-time cluster 11)
(bind slot-executors2 (slot-assignments cluster storm-id))
(bind executor->start2 (executor-start-times cluster storm-id))
(check-executor-distribution slot-executors2 [2 2 2 3])
(check-consistency cluster "test")
(bind common (first (find-first (fn [[k v]] (= 3 (count v))) slot-executors2)))
(is (not-nil? common))
(is (= (slot-executors2 common) (slot-executors common)))
;; check that start times are changed for everything but the common one
(bind same-executors (slot-executors2 common))
(bind changed-executors (apply concat (vals (dissoc slot-executors2 common))))
(doseq [t same-executors]
(is (= (executor->start t) (executor->start2 t))))
(doseq [t changed-executors]
(is (not= (executor->start t) (executor->start2 t))))
)))
(deftest test-rebalance
(with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 3
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(letlocals
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster)
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 60} topology)
(bind storm-id (get-storm-id state "test"))
(add-supervisor cluster :ports 3)
(add-supervisor cluster :ports 3)
(advance-cluster-time cluster 91)
(bind slot-executors (slot-assignments cluster storm-id))
;; check that all workers are on one machine
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 1)
(.rebalance (:nimbus cluster) "test" (RebalanceOptions.))
(advance-cluster-time cluster 31)
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 1)
(advance-cluster-time cluster 30)
(bind slot-executors (slot-assignments cluster storm-id))
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 3)
(is (thrown? InvalidTopologyException
(.rebalance (:nimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" 0})
))))
)))
(deftest test-rebalance-change-parallelism
(with-simulated-time-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(letlocals
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
:parallelism-hint 6
:conf {TOPOLOGY-TASKS 12})}
{}))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster)
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
(bind storm-id (get-storm-id state "test"))
(bind checker (fn [distribution]
(check-executor-distribution
(slot-assignments cluster storm-id)
distribution)))
(checker [2 2 2])
(.rebalance (:nimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_workers 6)
))
(advance-cluster-time cluster 29)
(checker [2 2 2])
(advance-cluster-time cluster 3)
(checker [1 1 1 1 1 1])
(.rebalance (:nimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" 1})
))
(advance-cluster-time cluster 29)
(checker [1 1 1 1 1 1])
(advance-cluster-time cluster 3)
(checker [1])
(.rebalance (:nimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" 8})
(.set_num_workers 4)
))
(advance-cluster-time cluster 32)
(checker [2 2 2 2])
(check-consistency cluster "test")
(bind executor-info (->> (storm-component->executor-info cluster "test")
(map-val #(map executor-id->tasks %))))
(check-distribution (executor-info "1") [2 2 2 2 1 1 1 1])
)))
(defn check-for-collisions [state]
(log-message "Checking for collision")
(let [assignments (.assignments state nil)]
(log-message "Assignemts: " assignments)
(let [id->node->ports (into {} (for [id assignments
:let [executor->node+port (:executor->node+port (.assignment-info state id nil))
node+ports (set (.values executor->node+port))
node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [[node port] node+ports] {node [port]}))]]
{id node->ports}))
_ (log-message "id->node->ports: " id->node->ports)
all-nodes (apply merge-with (fn [a b]
(let [ret (concat a b)]
(log-message "Can we combine " (pr-str a) " and " (pr-str b) " without collisions? " (apply distinct? ret) " => " (pr-str ret))
(is (apply distinct? ret))
(distinct ret)))
(.values id->node->ports))]
)))
(deftest test-rebalance-constrained-cluster
(with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 4
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(letlocals
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(bind topology2 (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(bind topology3 (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster)
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology)
(submit-local-topology (:nimbus cluster)
"test2"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2)
(submit-local-topology (:nimbus cluster)
"test3"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3)
(advance-cluster-time cluster 31)
(check-for-collisions state)
(.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.)
(.set_num_workers 4)
(.set_wait_secs 0)
))
(advance-cluster-time cluster 11)
(check-for-collisions state)
(advance-cluster-time cluster 30)
(check-for-collisions state)
)))
(deftest test-submit-invalid
(with-simulated-time-local-cluster [cluster
:daemon-conf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
NIMBUS-EXECUTORS-PER-TOPOLOGY 8
NIMBUS-SLOTS-PER-TOPOLOGY 8}]
(letlocals
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 1 :conf {TOPOLOGY-TASKS 1})}
{}))
(is (thrown? InvalidTopologyException
(submit-local-topology (:nimbus cluster)
"test/aaa"
{}
topology)))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
:parallelism-hint 16
:conf {TOPOLOGY-TASKS 16})}
{}))
(bind state (:storm-cluster-state cluster))
(is (thrown? InvalidTopologyException
(submit-local-topology (:nimbus cluster)
"test"
{TOPOLOGY-WORKERS 3}
topology)))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true)
:parallelism-hint 5
:conf {TOPOLOGY-TASKS 5})}
{}))
(is (thrown? InvalidTopologyException
(submit-local-topology (:nimbus cluster)
"test"
{TOPOLOGY-WORKERS 16}
topology)))
(is (nil? (submit-local-topology (:nimbus cluster)
"test"
{TOPOLOGY-WORKERS 8}
topology))))))
(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
(let [leader-address (NimbusInfo. leader-name leader-port true)]
(reify ILeaderElector
(prepare [this conf] true)
(isLeader [this] is-leader)
(addToLeaderLockQueue [this] true)
(getLeader [this] leader-address)
(getAllNimbuses [this] `(leader-address))
(close [this] true))))
(deftest test-cleans-corrupt
(with-inprocess-zookeeper zk-port
(with-local-tmp [nimbus-dir]
(stubbing [zk-leader-elector (mock-leader-elector)]
(letlocals
(bind conf (merge (read-storm-config)
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT zk-port
STORM-LOCAL-DIR nimbus-dir}))
(bind cluster-state (cluster/mk-storm-cluster-state conf))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(submit-local-topology nimbus "t1" {} topology)
(submit-local-topology nimbus "t2" {} topology)
(bind storm-id1 (get-storm-id cluster-state "t1"))
(bind storm-id2 (get-storm-id cluster-state "t2"))
(.shutdown nimbus)
(rmr (master-stormdist-root conf storm-id1))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(is ( = #{storm-id2} (set (.active-storms cluster-state))))
(.shutdown nimbus)
(.disconnect cluster-state)
)))))
(deftest test-cleans-corrupt
(with-inprocess-zookeeper zk-port
(with-local-tmp [nimbus-dir]
(stubbing [zk-leader-elector (mock-leader-elector)]
(letlocals
(bind conf (merge (read-storm-config)
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT zk-port
STORM-LOCAL-DIR nimbus-dir}))
(bind cluster-state (cluster/mk-storm-cluster-state conf))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(submit-local-topology nimbus "t1" {} topology)
(submit-local-topology nimbus "t2" {} topology)
(bind storm-id1 (get-storm-id cluster-state "t1"))
(bind storm-id2 (get-storm-id cluster-state "t2"))
(.shutdown nimbus)
(rmr (master-stormdist-root conf storm-id1))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(is ( = #{storm-id2} (set (.active-storms cluster-state))))
(.shutdown nimbus)
(.disconnect cluster-state)
)))))
;(deftest test-no-overlapping-slots
; ;; test that same node+port never appears across 2 assignments
; )
;(deftest test-stateless
; ;; test that nimbus can die and restart without any problems
; )
(deftest test-clean-inbox
"Tests that the inbox correctly cleans jar files."
(with-simulated-time
(with-local-tmp [dir-location]
(let [dir (File. dir-location)
mk-file (fn [name seconds-ago]
(let [f (File. (str dir-location "/" name))
t (- (Time/currentTimeMillis) (* seconds-ago 1000))]
(FileUtils/touch f)
(.setLastModified f t)))
assert-files-in-dir (fn [compare-file-names]
(let [file-names (map #(.getName %) (file-seq dir))]
(is (= (sort compare-file-names)
(sort (filter #(.endsWith % ".jar") file-names))
))))]
;; Make three files a.jar, b.jar, c.jar.
;; a and b are older than c and should be deleted first.
(advance-time-secs! 100)
(doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
(apply mk-file fs))
(assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
(nimbus/clean-inbox dir-location 10)
(assert-files-in-dir ["c.jar"])
;; Cleanit again, c.jar should stay
(advance-time-secs! 5)
(nimbus/clean-inbox dir-location 10)
(assert-files-in-dir ["c.jar"])
;; Advance time, clean again, c.jar should be deleted.
(advance-time-secs! 5)
(nimbus/clean-inbox dir-location 10)
(assert-files-in-dir [])
))))
(deftest test-leadership
"Tests that leader actions can only be performed by master and non leader fails to perform the same actions."
(with-inprocess-zookeeper zk-port
(with-local-tmp [nimbus-dir]
(stubbing [zk-leader-elector (mock-leader-elector)]
(letlocals
(bind conf (merge (read-storm-config)
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT zk-port
STORM-LOCAL-DIR nimbus-dir}))
(bind cluster-state (cluster/mk-storm-cluster-state conf))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(stubbing [zk-leader-elector (mock-leader-elector :is-leader false)]
(letlocals
(bind non-leader-cluster-state (cluster/mk-storm-cluster-state conf))
(bind non-leader-nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
;first we verify that the master nimbus can perform all actions, even with another nimbus present.
(submit-local-topology nimbus "t1" {} topology)
(.deactivate nimbus "t1")
(.activate nimbus "t1")
(.rebalance nimbus "t1" (RebalanceOptions.))
(.killTopology nimbus "t1")
;now we verify that non master nimbus can not perform any of the actions.
(is (thrown? RuntimeException
(submit-local-topology non-leader-nimbus
"failing"
{}
topology)))
(is (thrown? RuntimeException
(.killTopology non-leader-nimbus
"t1")))
(is (thrown? RuntimeException
(.activate non-leader-nimbus "t1")))
(is (thrown? RuntimeException
(.deactivate non-leader-nimbus "t1")))
(is (thrown? RuntimeException
(.rebalance non-leader-nimbus "t1" (RebalanceOptions.))))
(.shutdown non-leader-nimbus)
(.disconnect non-leader-cluster-state)
))
(.shutdown nimbus)
(.disconnect cluster-state))))))
(deftest test-nimbus-iface-submitTopologyWithOpts-checks-authorization
(with-local-cluster [cluster
:daemon-conf {NIMBUS-AUTHORIZER
"backtype.storm.security.auth.authorizer.DenyAuthorizer"}]
(let [
nimbus (:nimbus cluster)
topology (thrift/mk-topology {} {})
]
(is (thrown? AuthorizationException
(submit-local-topology-with-opts nimbus "mystorm" {} topology
(SubmitOptions. TopologyInitialStatus/INACTIVE))
))
)
)
)
(deftest test-nimbus-iface-methods-check-authorization
(with-local-cluster [cluster
:daemon-conf {NIMBUS-AUTHORIZER
"backtype.storm.security.auth.authorizer.DenyAuthorizer"}]
(let [
nimbus (:nimbus cluster)
topology (thrift/mk-topology {} {})
]
; Fake good authorization as part of setup.
(mocking [nimbus/check-authorization!]
(submit-local-topology-with-opts nimbus "test" {} topology
(SubmitOptions. TopologyInitialStatus/INACTIVE))
)
(stubbing [nimbus/storm-active? true]
(is (thrown? AuthorizationException
(.rebalance nimbus "test" (RebalanceOptions.))
))
)
(is (thrown? AuthorizationException
(.activate nimbus "test")
))
(is (thrown? AuthorizationException
(.deactivate nimbus "test")
))
)
)
)
(deftest test-nimbus-check-authorization-params
(with-local-cluster [cluster
:daemon-conf {NIMBUS-AUTHORIZER "backtype.storm.security.auth.authorizer.NoopAuthorizer"}]
(let [nimbus (:nimbus cluster)
topology-name "test-nimbus-check-autho-params"
topology (thrift/mk-topology {} {})]
(submit-local-topology-with-opts nimbus topology-name {} topology
(SubmitOptions. TopologyInitialStatus/INACTIVE))
(let [expected-name topology-name
expected-conf {TOPOLOGY-NAME expected-name
:foo :bar}]
(testing "getTopologyConf calls check-authorization! with the correct parameters."
(let [expected-operation "getTopologyConf"]
(stubbing [nimbus/check-authorization! nil
nimbus/try-read-storm-conf expected-conf
util/to-json nil]
(try
(.getTopologyConf nimbus "fake-id")
(catch NotAliveException e)
(finally
(verify-first-call-args-for-indices
nimbus/check-authorization!
[1 2 3] expected-name expected-conf expected-operation)
(verify-first-call-args-for util/to-json expected-conf))))))
(testing "getTopology calls check-authorization! with the correct parameters."
(let [expected-operation "getTopology"]
(stubbing [nimbus/check-authorization! nil
nimbus/try-read-storm-conf expected-conf
nimbus/try-read-storm-topology nil
system-topology! nil]
(try
(.getTopology nimbus "fake-id")
(catch NotAliveException e)
(finally
(verify-first-call-args-for-indices
nimbus/check-authorization!
[1 2 3] expected-name expected-conf expected-operation)
(verify-first-call-args-for-indices
system-topology! [0] expected-conf))))))
(testing "getUserTopology calls check-authorization with the correct parameters."
(let [expected-operation "getUserTopology"]
(stubbing [nimbus/check-authorization! nil
nimbus/try-read-storm-conf expected-conf
nimbus/try-read-storm-topology nil]
(try
(.getUserTopology nimbus "fake-id")
(catch NotAliveException e)
(finally
(verify-first-call-args-for-indices
nimbus/check-authorization!
[1 2 3] expected-name expected-conf expected-operation)
(verify-first-call-args-for-indices
nimbus/try-read-storm-topology [0] expected-conf))))))))))
(deftest test-nimbus-iface-getTopology-methods-throw-correctly
(with-local-cluster [cluster]
(let [
nimbus (:nimbus cluster)
id "bogus ID"
]
(is (thrown? NotAliveException (.getTopology nimbus id)))
(try
(.getTopology nimbus id)
(catch NotAliveException e
(is (= id (.get_msg e)))
)
)
(is (thrown? NotAliveException (.getTopologyConf nimbus id)))
(try (.getTopologyConf nimbus id)
(catch NotAliveException e
(is (= id (.get_msg e)))
)
)
(is (thrown? NotAliveException (.getTopologyInfo nimbus id)))
(try (.getTopologyInfo nimbus id)
(catch NotAliveException e
(is (= id (.get_msg e)))
)
)
(is (thrown? NotAliveException (.getUserTopology nimbus id)))
(try (.getUserTopology nimbus id)
(catch NotAliveException e
(is (= id (.get_msg e)))
)
)
)
)
)
(deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
(with-local-cluster [cluster]
(let [
nimbus (:nimbus cluster)
bogus-secs 42
bogus-type "bogusType"
bogus-bases {
"1" nil
"2" {:launch-time-secs bogus-secs
:storm-name "id2-name"
:status {:type bogus-type}}
"3" nil
"4" {:launch-time-secs bogus-secs
:storm-name "id4-name"
:status {:type bogus-type}}
}
]
(stubbing [topology-bases bogus-bases]
(let [topos (.get_topologies (.getClusterInfo nimbus))]
; The number of topologies in the summary is correct.
(is (= (count
(filter (fn [b] (second b)) bogus-bases)) (count topos)))
; Each topology present has a valid name.
(is (empty?
(filter (fn [t] (or (nil? t) (nil? (.get_name t)))) topos)))
; The topologies are those with valid bases.
(is (empty?
(filter (fn [t]
(or
(nil? t)
(not (number? (read-string (.get_id t))))
(odd? (read-string (.get_id t)))
)) topos)))
)
)
)
)
)
(deftest test-defserverfn-numbus-iface-instance
(test-nimbus-iface-submitTopologyWithOpts-checks-authorization)
(test-nimbus-iface-methods-check-authorization)
(test-nimbus-iface-getTopology-methods-throw-correctly)
(test-nimbus-iface-getClusterInfo-filters-topos-without-bases)
)
(deftest test-nimbus-data-acls
(testing "nimbus-data uses correct ACLs"
(let [scheme "digest"
digest "storm:thisisapoorpassword"
auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
STORM-ZOOKEEPER-AUTH-PAYLOAD digest
NIMBUS-THRIFT-PORT 6666}
expected-acls nimbus/NIMBUS-ZK-ACLS
fake-inimbus (reify INimbus (getForcedScheduler [this] nil))]
(stubbing [mk-authorization-handler nil
cluster/mk-storm-cluster-state nil
nimbus/file-cache-map nil
uptime-computer nil
new-instance nil
mk-timer nil
nimbus/mk-code-distributor nil
zk-leader-elector nil
nimbus/mk-scheduler nil]
(nimbus/nimbus-data auth-conf fake-inimbus)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
(verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
expected-acls)))))
(deftest test-file-bogus-download
(with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [nimbus (:nimbus cluster)]
(is (thrown-cause? AuthorizationException (.beginFileDownload nimbus nil)))
(is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "")))
(is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "/bogus-path/foo")))
)))
(deftest test-validate-topo-config-on-submit
(with-local-cluster [cluster]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology {} {})
bad-config {"topology.isolate.machines" "2"}]
; Fake good authorization as part of setup.
(mocking [nimbus/check-authorization!]
(is (thrown-cause? InvalidTopologyException
(submit-local-topology-with-opts nimbus "test" bad-config topology
(SubmitOptions.))))))))
(deftest test-stateless-with-scheduled-topology-to-be-killed
; tests regression of STORM-856
(with-inprocess-zookeeper zk-port
(with-local-tmp [nimbus-dir]
(letlocals
(bind conf (merge (read-storm-config)
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT zk-port
STORM-LOCAL-DIR nimbus-dir}))
(bind cluster-state (cluster/mk-storm-cluster-state conf))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(sleep-secs 1)
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
; make transition for topology t1 to be killed -> nimbus applies this event to cluster state
(.killTopology nimbus "t1")
; shutdown nimbus immediately to achieve nimbus doesn't handle event right now
(.shutdown nimbus)
; in startup of nimbus it reads cluster state and take proper actions
; in this case nimbus registers topology transition event to scheduler again
; before applying STORM-856 nimbus was killed with NPE
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(.shutdown nimbus)
(.disconnect cluster-state)
))))
(deftest test-debug-on-component
(with-local-cluster [cluster]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"spout" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{})]
(submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology)
(.debug nimbus "t1" "spout" true 100))))
(deftest test-debug-on-global
(with-local-cluster [cluster]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"spout" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{})]
(submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology)
(.debug nimbus "t1" "" true 100))))