blob: b17ea5ee302d96985cce6a28e4e15abea4289730 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.supervisor-test
(:use [clojure test])
(:require [conjure.core])
(:use [conjure core])
(:require [clojure.contrib [string :as contrib-str]])
(:require [clojure [string :as string] [set :as set]])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout])
(:import [backtype.storm.scheduler ISupervisor])
(:import [backtype.storm.generated RebalanceOptions])
(:import [java.util UUID])
(:use [backtype.storm config testing util timer])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker] [supervisor :as supervisor]]
[backtype.storm [thrift :as thrift] [cluster :as cluster]])
(:use [conjure core])
(:require [clojure.java.io :as io]))
(defn worker-assignment
"Return [storm-id executors]"
[cluster supervisor-id port]
(let [state (:storm-cluster-state cluster)
slot-assigns (for [storm-id (.assignments state nil)]
(let [executors (-> (.assignment-info state storm-id nil)
:executor->node+port
reverse-map
(get [supervisor-id port] ))]
(when executors [storm-id executors])
))
ret (find-first not-nil? slot-assigns)]
(when-not ret
(throw-runtime "Could not find assignment for worker"))
ret
))
(defn heartbeat-worker [supervisor port storm-id executors]
(let [conf (.get-conf supervisor)]
(worker/do-heartbeat {:conf conf
:port port
:storm-id storm-id
:executors executors
:worker-id (find-worker-id conf port)})))
(defn heartbeat-workers [cluster supervisor-id ports]
(let [sup (get-supervisor cluster supervisor-id)]
(doseq [p ports]
(let [[storm-id executors] (worker-assignment cluster supervisor-id p)]
(heartbeat-worker sup p storm-id executors)
))))
(defn validate-launched-once [launched supervisor->ports storm-id]
(let [counts (map count (vals launched))
launched-supervisor->ports (apply merge-with set/union
(for [[[s p] sids] launched
:when (some #(= % storm-id) sids)]
{s #{p}}))
supervisor->ports (map-val set supervisor->ports)]
(is (every? (partial = 1) counts))
(is (= launched-supervisor->ports supervisor->ports))
))
(deftest launches-assignment
(with-simulated-time-local-cluster [cluster :supervisors 0
:daemon-conf {NIMBUS-REASSIGN false
SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
SUPERVISOR-WORKER-TIMEOUT-SECS 15
SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
(letlocals
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
{}))
(bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
(bind changed (capture-changed-workers
(submit-mocked-assignment
(:nimbus cluster)
"test"
{TOPOLOGY-WORKERS 3}
topology
{1 "1"
2 "1"
3 "1"
4 "1"}
{[1 1] ["sup1" 1]
[2 2] ["sup1" 2]
[3 3] ["sup1" 3]
[4 4] ["sup1" 3]
})
(advance-cluster-time cluster 2)
(heartbeat-workers cluster "sup1" [1 2 3])
(advance-cluster-time cluster 10)))
(bind storm-id (get-storm-id (:storm-cluster-state cluster) "test"))
(is (empty? (:shutdown changed)))
(validate-launched-once (:launched changed) {"sup1" [1 2 3]} storm-id)
(bind changed (capture-changed-workers
(doseq [i (range 10)]
(heartbeat-workers cluster "sup1" [1 2 3])
(advance-cluster-time cluster 10))
))
(is (empty? (:shutdown changed)))
(is (empty? (:launched changed)))
(bind changed (capture-changed-workers
(heartbeat-workers cluster "sup1" [1 2])
(advance-cluster-time cluster 10)
))
(validate-launched-once (:launched changed) {"sup1" [3]} storm-id)
(is (= {["sup1" 3] 1} (:shutdown changed)))
)))
(deftest test-multiple-active-storms-multiple-supervisors
(with-simulated-time-local-cluster [cluster :supervisors 0
:daemon-conf {NIMBUS-REASSIGN false
SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
SUPERVISOR-WORKER-TIMEOUT-SECS 15
SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
(letlocals
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
{}))
(bind topology2 (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
(bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
(bind sup2 (add-supervisor cluster :id "sup2" :ports [1 2]))
(bind changed (capture-changed-workers
(submit-mocked-assignment
(:nimbus cluster)
"test"
{TOPOLOGY-WORKERS 3 TOPOLOGY-MESSAGE-TIMEOUT-SECS 40}
topology
{1 "1"
2 "1"
3 "1"
4 "1"}
{[1 1] ["sup1" 1]
[2 2] ["sup1" 2]
[3 3] ["sup2" 1]
[4 4] ["sup2" 1]
})
(advance-cluster-time cluster 2)
(heartbeat-workers cluster "sup1" [1 2])
(heartbeat-workers cluster "sup2" [1])
))
(bind storm-id (get-storm-id (:storm-cluster-state cluster) "test"))
(is (empty? (:shutdown changed)))
(validate-launched-once (:launched changed) {"sup1" [1 2] "sup2" [1]} storm-id)
(bind changed (capture-changed-workers
(submit-mocked-assignment
(:nimbus cluster)
"test2"
{TOPOLOGY-WORKERS 2}
topology2
{1 "1"
2 "1"
3 "1"}
{[1 1] ["sup1" 3]
[2 2] ["sup1" 3]
[3 3] ["sup2" 2]
})
(advance-cluster-time cluster 2)
(heartbeat-workers cluster "sup1" [3])
(heartbeat-workers cluster "sup2" [2])
))
(bind storm-id2 (get-storm-id (:storm-cluster-state cluster) "test2"))
(is (empty? (:shutdown changed)))
(validate-launched-once (:launched changed) {"sup1" [3] "sup2" [2]} storm-id2)
(bind changed (capture-changed-workers
(.killTopology (:nimbus cluster) "test")
(doseq [i (range 4)]
(advance-cluster-time cluster 8)
(heartbeat-workers cluster "sup1" [1 2 3])
(heartbeat-workers cluster "sup2" [1 2])
)))
(is (empty? (:shutdown changed)))
(is (empty? (:launched changed)))
(bind changed (capture-changed-workers
(advance-cluster-time cluster 12)
))
(is (empty? (:launched changed)))
(is (= {["sup1" 1] 1 ["sup1" 2] 1 ["sup2" 1] 1} (:shutdown changed)))
(bind changed (capture-changed-workers
(doseq [i (range 10)]
(heartbeat-workers cluster "sup1" [3])
(heartbeat-workers cluster "sup2" [2])
(advance-cluster-time cluster 10)
)))
(is (empty? (:shutdown changed)))
(is (empty? (:launched changed)))
;; TODO check that downloaded code is cleaned up only for the one storm
)))
(defn get-heartbeat [cluster supervisor-id]
(.supervisor-info (:storm-cluster-state cluster) supervisor-id))
(defn check-heartbeat [cluster supervisor-id within-secs]
(let [hb (get-heartbeat cluster supervisor-id)
time-secs (:time-secs hb)
now (current-time-secs)
delta (- now time-secs)]
(is (>= delta 0))
(is (<= delta within-secs))
))
(deftest heartbeats-to-nimbus
(with-simulated-time-local-cluster [cluster :supervisors 0
:daemon-conf {SUPERVISOR-WORKER-START-TIMEOUT-SECS 15
SUPERVISOR-HEARTBEAT-FREQUENCY-SECS 3}]
(letlocals
(bind sup1 (add-supervisor cluster :id "sup" :ports [5 6 7]))
(advance-cluster-time cluster 4)
(bind hb (get-heartbeat cluster "sup"))
(is (= #{5 6 7} (set (:meta hb))))
(check-heartbeat cluster "sup" 3)
(advance-cluster-time cluster 3)
(check-heartbeat cluster "sup" 3)
(advance-cluster-time cluster 3)
(check-heartbeat cluster "sup" 3)
(advance-cluster-time cluster 15)
(check-heartbeat cluster "sup" 3)
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
{}))
;; prevent them from launching by capturing them
(capture-changed-workers
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
(advance-cluster-time cluster 3)
(check-heartbeat cluster "sup" 3)
(advance-cluster-time cluster 3)
(check-heartbeat cluster "sup" 3)
(advance-cluster-time cluster 3)
(check-heartbeat cluster "sup" 3)
(advance-cluster-time cluster 20)
(check-heartbeat cluster "sup" 3))
)))
(deftest test-worker-launch-command
(testing "*.worker.childopts configuration"
(let [mock-port "42"
mock-storm-id "fake-storm-id"
mock-worker-id "fake-worker-id"
mock-cp (str file-path-separator "base" class-path-separator file-path-separator "stormjar.jar")
mock-sensitivity "S3"
mock-cp "/base:/stormjar.jar"
exp-args-fn (fn [opts topo-opts classpath]
(concat [(supervisor/java-cmd) "-cp" classpath
(str "-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log")
"-Dstorm.home="
(str "-Dstorm.id=" mock-storm-id)
(str "-Dworker.id=" mock-worker-id)
(str "-Dworker.port=" mock-port)
"-Dstorm.log.dir=/logs"
"-Dlog4j.configurationFile=/log4j2/worker.xml"
"backtype.storm.LogWriter"]
[(supervisor/java-cmd) "-server"]
opts
topo-opts
["-Djava.library.path="
(str "-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log")
"-Dstorm.home="
"-Dstorm.conf.file="
"-Dstorm.options="
(str "-Dstorm.log.dir=" file-path-separator "logs")
(str "-Dlogging.sensitivity=" mock-sensitivity)
(str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml")
(str "-Dstorm.id=" mock-storm-id)
(str "-Dworker.id=" mock-worker-id)
(str "-Dworker.port=" mock-port)
"-cp" classpath
"backtype.storm.daemon.worker"
mock-storm-id
mock-port
mock-worker-id]))]
(testing "testing *.worker.childopts as strings with extra spaces"
(let [string-opts "-Dfoo=bar -Xmx1024m"
topo-string-opts "-Dkau=aux -Xmx2048m"
exp-args (exp-args-fn ["-Dfoo=bar" "-Xmx1024m"]
["-Dkau=aux" "-Xmx2048m"]
mock-cp)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
WORKER-CHILDOPTS string-opts}}]
(stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-string-opts}
add-to-classpath mock-cp
supervisor-stormdist-root nil
launch-process nil
set-worker-user! nil
supervisor/jlp nil
supervisor/write-log-metadata! nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id)
(verify-first-call-args-for-indices launch-process
[0]
exp-args))))
(testing "testing *.worker.childopts as list of strings, with spaces in values"
(let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
exp-args (exp-args-fn list-opts topo-list-opts mock-cp)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
WORKER-CHILDOPTS list-opts}}]
(stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-list-opts}
add-to-classpath mock-cp
supervisor-stormdist-root nil
launch-process nil
set-worker-user! nil
supervisor/jlp nil
supervisor/write-log-metadata! nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id)
(verify-first-call-args-for-indices launch-process
[0]
exp-args))))
(testing "testing topology.classpath is added to classpath"
(let [topo-cp (str file-path-separator "any" file-path-separator "path")
exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
(stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
supervisor-stormdist-root nil
supervisor/jlp nil
set-worker-user! nil
supervisor/write-log-metadata! nil
launch-process nil
current-classpath (str file-path-separator "base")]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id)
(verify-first-call-args-for-indices launch-process
[0]
exp-args))))
(testing "testing topology.environment is added to environment for worker launch"
(let [topo-env {"THISVAR" "somevalue" "THATVAR" "someothervalue"}
full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
exp-args (exp-args-fn [] [] mock-cp)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
(stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
supervisor-stormdist-root nil
supervisor/jlp nil
launch-process nil
set-worker-user! nil
supervisor/write-log-metadata! nil
current-classpath (str file-path-separator "base")]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id)
(verify-first-call-args-for-indices launch-process
[2]
full-env)))))))
(defn rm-r [f]
(if (.isDirectory f)
(for [sub (.listFiles f)] (rm-r sub))
(.delete f)
))
(deftest test-worker-launch-command-run-as-user
(testing "*.worker.childopts configuration"
(let [mock-port "42"
mock-storm-id "fake-storm-id"
mock-worker-id "fake-worker-id"
mock-sensitivity "S3"
mock-cp "mock-classpath'quote-on-purpose"
storm-local (str "/tmp/" (UUID/randomUUID))
worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh")
exp-launch ["/bin/worker-launcher"
"me"
"worker"
(str storm-local "/workers/" mock-worker-id)
worker-script]
exp-script-fn (fn [opts topo-opts]
(str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
" '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
" '-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log'"
" '-Dstorm.home='"
" '-Dstorm.id=" mock-storm-id "'"
" '-Dworker.id=" mock-worker-id "'"
" '-Dworker.port=" mock-port "'"
" '-Dstorm.log.dir=/logs'"
" '-Dlog4j.configurationFile=/log4j2/worker.xml'"
" 'backtype.storm.LogWriter'"
" 'java' '-server'"
" " (shell-cmd opts)
" " (shell-cmd topo-opts)
" '-Djava.library.path='"
" '-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log'"
" '-Dstorm.home='"
" '-Dstorm.conf.file='"
" '-Dstorm.options='"
" '-Dstorm.log.dir=/logs'"
" '-Dlogging.sensitivity=" mock-sensitivity "'"
" '-Dlog4j.configurationFile=/log4j2/worker.xml'"
" '-Dstorm.id=" mock-storm-id "'"
" '-Dworker.id=" mock-worker-id "'"
" '-Dworker.port=" mock-port "'"
" '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
" 'backtype.storm.daemon.worker'"
" '" mock-storm-id "'"
" '" mock-port "'"
" '" mock-worker-id "';"))]
(.mkdirs (io/file storm-local "workers" mock-worker-id))
(try
(testing "testing *.worker.childopts as strings with extra spaces"
(let [string-opts "-Dfoo=bar -Xmx1024m"
topo-string-opts "-Dkau=aux -Xmx2048m"
exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
["-Dkau=aux" "-Xmx2048m"])
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
STORM-LOCAL-DIR storm-local
SUPERVISOR-RUN-WORKER-AS-USER true
WORKER-CHILDOPTS string-opts}}]
(stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-string-opts
TOPOLOGY-SUBMITTER-USER "me"}
add-to-classpath mock-cp
supervisor-stormdist-root nil
launch-process nil
set-worker-user! nil
supervisor/java-cmd "java"
supervisor/jlp nil
supervisor/write-log-metadata! nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id)
(verify-first-call-args-for-indices launch-process
[0]
exp-launch))
(is (= (slurp worker-script) exp-script))))
(testing "testing *.worker.childopts as list of strings, with spaces in values"
(let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
exp-script (exp-script-fn list-opts topo-list-opts)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
STORM-LOCAL-DIR storm-local
SUPERVISOR-RUN-WORKER-AS-USER true
WORKER-CHILDOPTS list-opts}}]
(stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-list-opts
TOPOLOGY-SUBMITTER-USER "me"}
add-to-classpath mock-cp
supervisor-stormdist-root nil
launch-process nil
set-worker-user! nil
supervisor/java-cmd "java"
supervisor/jlp nil
supervisor/write-log-metadata! nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id)
(verify-first-call-args-for-indices launch-process
[0]
exp-launch))
(is (= (slurp worker-script) exp-script))))
(finally (rm-r (io/file storm-local)))
))))
(deftest test-workers-go-bananas
;; test that multiple workers are started for a port, and test that
;; supervisor shuts down propertly (doesn't shutdown the most
;; recently launched one, checks heartbeats correctly, etc.)
)
(deftest downloads-code
)
(deftest test-stateless
)
(deftest cleans-up-on-unassign
;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
)
(deftest test-supervisor-data-acls
(testing "supervisor-data uses correct ACLs"
(let [scheme "digest"
digest "storm:thisisapoorpassword"
auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
expected-acls supervisor/SUPERVISOR-ZK-ACLS
fake-isupervisor (reify ISupervisor
(getSupervisorId [this] nil)
(getAssignmentId [this] nil))]
(stubbing [uptime-computer nil
cluster/mk-storm-cluster-state nil
supervisor-state nil
local-hostname nil
supervisor/mk-code-distributor nil
mk-timer nil]
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
(verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
expected-acls)))))
(deftest test-write-log-metadata
(testing "supervisor writes correct data to logs metadata file"
(let [exp-owner "alice"
exp-worker-id "42"
exp-storm-id "0123456789"
exp-port 4242
exp-logs-users ["bob" "charlie" "daryl"]
exp-logs-groups ["read-only-group" "special-group"]
storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
TOPOLOGY-USERS ["charlie" "bob"]
TOPOLOGY-GROUPS ["special-group"]
LOGS-GROUPS ["read-only-group"]
LOGS-USERS ["daryl"]}
exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
"worker-id" exp-worker-id
LOGS-USERS exp-logs-users
LOGS-GROUPS exp-logs-groups}
conf {}]
(mocking [supervisor/write-log-metadata-to-yaml-file!]
(supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
exp-storm-id exp-port conf)
(verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
exp-storm-id exp-port exp-data conf)))))
(deftest test-worker-launcher-requires-user
(testing "worker-launcher throws on blank user"
(mocking [launch-process]
(is (thrown-cause-with-msg? java.lang.IllegalArgumentException
#"(?i).*user cannot be blank.*"
(supervisor/worker-launcher {} nil ""))))))
(defn found? [sub-str input-str]
(if (string? input-str)
(contrib-str/substring? sub-str (str input-str))
(boolean (some #(contrib-str/substring? sub-str %) input-str))))
(defn not-found? [sub-str input-str]
(complement (found? sub-str input-str)))
(deftest test-substitute-childopts-happy-path-string
(testing "worker-launcher replaces ids in childopts"
(let [worker-id "w-01"
topology-id "s-01"
port 9999
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m"
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m")
childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port)]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-happy-path-list
(testing "worker-launcher replaces ids in childopts"
(let [worker-id "w-01"
topology-id "s-01"
port 9999
childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m")
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m")
childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port)]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-topology-id-alone
(testing "worker-launcher replaces ids in childopts"
(let [worker-id "w-01"
topology-id "s-01"
port 9999
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port)]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-no-keys
(testing "worker-launcher has no ids to replace in childopts"
(let [worker-id "w-01"
topology-id "s-01"
port 9999
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port)]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-nil-childopts
(testing "worker-launcher has nil childopts"
(let [worker-id "w-01"
topology-id "s-01"
port 9999
childopts nil
expected-childopts nil
childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port)]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-nil-ids
(testing "worker-launcher has nil ids"
(let [worker-id nil
topology-id "s-01"
port 9999
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port)]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-retry-read-assignments
(with-simulated-time-local-cluster [cluster
:supervisors 0
:ports-per-supervisor 2
:daemon-conf {NIMBUS-REASSIGN false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
TOPOLOGY-ACKER-EXECUTORS 0}]
(letlocals
(bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
(bind topology1 (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
{}))
(bind topology2 (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
{}))
(bind state (:storm-cluster-state cluster))
(bind changed (capture-changed-workers
(submit-mocked-assignment
(:nimbus cluster)
"topology1"
{TOPOLOGY-WORKERS 2}
topology1
{1 "1"
2 "1"}
{[1 1] ["sup1" 1]
[2 2] ["sup1" 2]
})
(submit-mocked-assignment
(:nimbus cluster)
"topology2"
{TOPOLOGY-WORKERS 2}
topology2
{1 "1"
2 "1"}
{[1 1] ["sup1" 1]
[2 2] ["sup1" 2]
})
(advance-cluster-time cluster 10)
))
(is (empty? (:launched changed)))
(bind options (RebalanceOptions.))
(.set_wait_secs options 0)
(bind changed (capture-changed-workers
(.rebalance (:nimbus cluster) "topology2" options)
(advance-cluster-time cluster 10)
(heartbeat-workers cluster "sup1" [1 2 3 4])
(advance-cluster-time cluster 10)
))
(validate-launched-once (:launched changed)
{"sup1" [1 2]}
(get-storm-id (:storm-cluster-state cluster) "topology1"))
(validate-launched-once (:launched changed)
{"sup1" [3 4]}
(get-storm-id (:storm-cluster-state cluster) "topology2"))
)))