blob: 800113fca8b2dc59b5af4de7129da5f333882a86 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.nimbus-test
(:use [clojure test])
(:require [org.apache.storm [util :as util]])
(:import [java.util.function UnaryOperator])
(:import [org.apache.storm.testing InProcessZookeeper MockLeaderElector TestWordCounter TestWordSpout TestGlobalCount
TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
[org.apache.storm.blobstore BlobStore]
[org.apache.storm.nimbus InMemoryTopologyActionNotifier]
[org.apache.storm.daemon.nimbus TopoCache Nimbus Nimbus$StandaloneINimbus]
[org.apache.storm.generated GlobalStreamId TopologyStatus SupervisorInfo StormTopology StormBase]
[org.apache.storm LocalCluster LocalCluster$Builder Thrift MockAutoCred Testing Testing$Condition]
[org.apache.storm.stats BoltExecutorStats StatsUtil ClientStatsUtil]
[org.apache.storm.security.auth IGroupMappingServiceProvider IAuthorizer])
(:import [org.apache.storm.testing.staticmocking MockedZookeeper])
(:import [org.apache.storm.testing TmpPath])
(:import [org.apache.storm.scheduler INimbus])
(:import [org.mockito Mockito Matchers])
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
(:import [org.apache.storm.testing.staticmocking MockedCluster])
(:import [org.apache.storm.generated Credentials NotAliveException SubmitOptions
TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions
InvalidTopologyException AuthorizationException
LogConfig LogLevel LogLevelAction Assignment NodeInfo])
(:import [java.util Map HashMap HashSet Optional])
(:import [java.io File])
(:import [javax.security.auth Subject])
(:import [org.apache.storm.utils Time Time$SimulatedTime IPredicate StormCommonInstaller Utils$UptimeComputer ReflectionUtils Utils ConfigUtils ServerConfigUtils]
[org.apache.storm.utils.staticmocking ServerConfigUtilsInstaller ReflectionUtilsInstaller UtilsInstaller])
(:import [org.apache.storm.zookeeper Zookeeper])
(:import [org.apache.commons.io FileUtils])
(:import [org.json.simple JSONValue])
(:import [org.apache.storm.daemon StormCommon])
(:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils]
[org.apache.storm.assignments LocalAssignmentsBackendFactory])
(:import [org.apache.storm.metric StormMetricsRegistry])
(:use [org.apache.storm util daemon-config config log])
(:require [conjure.core])
(:use [conjure core]))
(def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
(defn- mk-nimbus
([conf inimbus]
(mk-nimbus conf inimbus nil nil nil nil))
([conf inimbus blob-store leader-elector group-mapper cluster-state]
;blacklist scheduler requires nimbus-monitor-freq-secs as input parameter.
(let [conf-with-nimbus-monitor-freq (merge {NIMBUS-MONITOR-FREQ-SECS 10} conf)]
(Nimbus. conf-with-nimbus-monitor-freq inimbus cluster-state nil blob-store nil leader-elector group-mapper (StormMetricsRegistry.)))))
(defn- from-json
[^String str]
(if str
(clojurify-structure
(JSONValue/parse str))
nil))
(defn storm-component->task-info [cluster storm-name]
(let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
nimbus (.getNimbus cluster)]
(-> (.getUserTopology nimbus storm-id)
(#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
(Utils/reverseMap)
clojurify-structure)))
(defn getCredentials [cluster storm-name]
(let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
creds (.credentials (.getClusterState cluster) storm-id nil)]
(if creds (into {} (.get_creds creds)))))
(defn storm-component->executor-info [cluster storm-name]
(let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
nimbus (.getNimbus cluster)
storm-conf (from-json (.getTopologyConf nimbus storm-id))
topology (.getUserTopology nimbus storm-id)
task->component (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf))
state (.getClusterState cluster)
get-component (comp task->component first)]
(->> (.assignmentInfo state storm-id nil)
.get_executor_node_port
.keySet
clojurify-structure
(map (fn [e] {e (get-component e)}))
(apply merge)
(Utils/reverseMap)
clojurify-structure)))
(defn storm-num-workers [state storm-name]
(let [storm-id (.get (.getTopoId state storm-name))
assignment (.assignmentInfo state storm-id nil)]
(.size (Utils/reverseMap (.get_executor_node_port assignment)))))
(defn topology-nodes [state storm-name]
(let [storm-id (.get (.getTopoId state storm-name))
assignment (.assignmentInfo state storm-id nil)]
(->> assignment
.get_executor_node_port
.values
(map (fn [np] (.get_node np)))
set
)))
(defn topology-slots [state storm-name]
(let [storm-id (.get (.getTopoId state storm-name))
assignment (.assignmentInfo state storm-id nil)]
(->> assignment
.get_executor_node_port
.values
set
)))
;TODO: when translating this function, don't call map-val, but instead use an inline for loop.
; map-val is a temporary kluge for clojure.
(defn topology-node-distribution [state storm-name]
(let [storm-id (.get (.getTopoId state storm-name))
assignment (.assignmentInfo state storm-id nil)]
(->> assignment
.get_executor_node_port
.values
set
(group-by (fn [np] (.get_node np)))
(map-val count)
(map (fn [[_ amt]] {amt 1}))
(apply merge-with +)
)))
(defn topology-num-nodes [state storm-name]
(count (topology-nodes state storm-name)))
(defn executor-assignment [cluster storm-id executor-id]
(let [state (.getClusterState cluster)
assignment (.assignmentInfo state storm-id nil)]
(.get (.get_executor_node_port assignment) executor-id)
))
(defn executor-start-times [cluster storm-id]
(let [state (.getClusterState cluster)
assignment (.assignmentInfo state storm-id nil)]
(clojurify-structure (.get_executor_start_time_secs assignment))))
(defn do-executor-heartbeat [cluster storm-id executor]
(let [state (.getClusterState cluster)
executor->node+port (.get_executor_node_port (.assignmentInfo state storm-id nil))
np (.get executor->node+port executor)
node (.get_node np)
port (first (.get_port np))
curr-beat (StatsUtil/convertZkWorkerHb (.getWorkerHeartbeat state storm-id node port))
stats (if (get curr-beat "executor-stats")
(get curr-beat "executor-stats")
(HashMap.))]
(log-warn "curr-beat:" (prn-str curr-beat) ",stats:" (prn-str stats))
(log-warn "stats type:" (type stats))
(.put stats (ClientStatsUtil/convertExecutor executor) (.renderStats (BoltExecutorStats. 20 (*STORM-CONF* NUM-STAT-BUCKETS))))
(log-warn "merged:" stats)
(.workerHeartbeat state storm-id node port
(ClientStatsUtil/thriftifyZkWorkerHb (ClientStatsUtil/mkZkWorkerHb storm-id stats (int 10))))
(.sendSupervisorWorkerHeartbeat (.getNimbus cluster) (StatsUtil/thriftifyRpcWorkerHb storm-id executor))))
(defn slot-assignments [cluster storm-id]
(let [state (.getClusterState cluster)
assignment (.assignmentInfo state storm-id nil)]
(clojurify-structure (Utils/reverseMap (.get_executor_node_port assignment)))))
(defn task-ids [cluster storm-id]
(let [nimbus (.getNimbus cluster)]
(-> (.getUserTopology nimbus storm-id)
(#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
clojurify-structure
keys)))
(defn topology-executors [cluster storm-id]
(let [state (.getClusterState cluster)
assignment (.assignmentInfo state storm-id nil)
ret-keys (keys (.get_executor_node_port assignment))
_ (log-message "ret-keys: " (pr-str ret-keys)) ]
ret-keys
))
(defn check-distribution [items distribution]
(let [counts (map long (map count items))]
(is (Testing/multiseteq counts (map long distribution)))))
(defn disjoint? [& sets]
(let [combined (apply concat sets)]
(= (count combined) (count (set combined)))
))
(defn executor->tasks [executor-id]
clojurify-structure (StormCommon/executorIdToTasks executor-id))
(defnk check-consistency [cluster storm-name :assigned? true]
(let [state (.getClusterState cluster)
storm-id (.get (.getTopoId state storm-name))
task-ids (task-ids cluster storm-id)
assignment (.assignmentInfo state storm-id nil)
executor->node+port (.get_executor_node_port assignment)
task->node+port (StormCommon/taskToNodeport executor->node+port)
assigned-task-ids (mapcat executor->tasks (keys executor->node+port))
all-nodes (set (map (fn [np] (.get_node np)) (.values executor->node+port)))]
(when assigned?
(is (= (sort task-ids) (sort assigned-task-ids)))
(doseq [t task-ids]
(is (not-nil? (.get task->node+port t)))))
(doseq [[e s] executor->node+port]
(is (not-nil? s)))
(is (= all-nodes (set (keys (.get_node_host assignment)))))
(doseq [[e s] executor->node+port]
(is (not-nil? (.get (.get_executor_start_time_secs assignment) e))))
))
(deftest test-bogusId
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSupervisors 4)
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [state (.getClusterState cluster)
nimbus (.getNimbus cluster)]
(is (thrown? NotAliveException (.getTopologyConf nimbus "bogus-id")))
(is (thrown? NotAliveException (.getTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getUserTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getTopologyInfo nimbus "bogus-id")))
(is (thrown? NotAliveException (.uploadNewCredentials nimbus "bogus-id" (Credentials.))))
)))
(deftest test-assignment
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 4)
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [state (.getClusterState cluster)
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. false) (Integer. 3))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 4))
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.))})
topology2 (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 12))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 6))
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareGlobalGrouping)}
(TestPlannerBolt.) (Integer. 8))
"4" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareGlobalGrouping)
(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 4))})
_ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
_ (.advanceClusterTime cluster 11)
task-info (storm-component->task-info cluster "mystorm")]
(check-consistency cluster "mystorm")
;; 3 should be assigned once (if it were optimized, we'd have
;; different topology)
(is (= 1 (count (.assignments state nil))))
(is (= 1 (count (task-info "1"))))
(is (= 4 (count (task-info "2"))))
(is (= 1 (count (task-info "3"))))
(is (= 4 (storm-num-workers state "mystorm")))
(.submitTopology cluster "storm2" {TOPOLOGY-WORKERS 20} topology2)
(.advanceClusterTime cluster 11)
(check-consistency cluster "storm2")
(is (= 2 (count (.assignments state nil))))
(let [task-info (storm-component->task-info cluster "storm2")]
(is (= 12 (count (task-info "1"))))
(is (= 6 (count (task-info "2"))))
(is (= 8 (count (task-info "3"))))
(is (= 4 (count (task-info "4"))))
(is (= 8 (storm-num-workers state "storm2")))
)
)))
(defn isolation-nimbus []
(let [standalone (Nimbus$StandaloneINimbus.)]
(reify INimbus
(prepare [this conf local-dir]
(.prepare standalone conf local-dir)
)
(allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
(.allSlotsAvailableForScheduling standalone supervisors topologies topologies-missing-assignments))
(assignSlots [this topology slots]
(.assignSlots standalone topology slots)
)
(getForcedScheduler [this]
(.getForcedScheduler standalone))
(getHostName [this supervisors node-id]
node-id
))))
(deftest test-auto-credentials
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 6)
(.withDaemonConf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
NIMBUS-CREDENTIAL-RENEW-FREQ-SECS 10
NIMBUS-CREDENTIAL-RENEWERS (list "org.apache.storm.MockAutoCred")
NIMBUS-AUTO-CRED-PLUGINS (list "org.apache.storm.MockAutoCred")
})))]
(let [state (.getClusterState cluster)
topology-name "test-auto-cred-storm"
submitOptions (SubmitOptions. TopologyInitialStatus/INACTIVE)
- (.set_creds submitOptions (Credentials. (HashMap.)))
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. false) (Integer. 3))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 4))
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.))})
_ (.submitTopologyWithOpts cluster topology-name {TOPOLOGY-WORKERS 4
TOPOLOGY-AUTO-CREDENTIALS (list "org.apache.storm.MockAutoCred")
} topology submitOptions)
credentials (getCredentials cluster topology-name)]
; check that the credentials have nimbus auto generated cred
(is (= (.get credentials MockAutoCred/NIMBUS_CRED_KEY) MockAutoCred/NIMBUS_CRED_VAL))
;advance cluster time so the renewers can execute
(.advanceClusterTime cluster 20)
;check that renewed credentials replace the original credential.
(is (= (.get (getCredentials cluster topology-name) MockAutoCred/NIMBUS_CRED_KEY) MockAutoCred/NIMBUS_CRED_RENEW_VAL))
(is (= (.get (getCredentials cluster topology-name) MockAutoCred/GATEWAY_CRED_KEY) MockAutoCred/GATEWAY_CRED_RENEW_VAL)))))
(defmacro letlocals
[& body]
(let [[tobind lexpr] (split-at (dec (count body)) body)
binded (vec (mapcat (fn [e]
(if (and (list? e) (= 'bind (first e)))
[(second e) (last e)]
['_ e]
))
tobind))]
`(let ~binded
~(first lexpr))))
(deftest test-isolated-assignment
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 6)
(.withINimbus (isolation-nimbus))
(.withDaemonConf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
STORM-SCHEDULER "org.apache.storm.scheduler.IsolationScheduler"
ISOLATION-SCHEDULER-MACHINES {"tester1" 3 "tester2" 2}
NIMBUS-MONITOR-FREQ-SECS 10
})))]
(letlocals
(bind state (.getClusterState cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. false) (Integer. 3))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 5))
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.))}))
(.submitTopology cluster "noniso" {TOPOLOGY-WORKERS 4} topology)
(.advanceClusterTime cluster 11)
(is (= 4 (topology-num-nodes state "noniso")))
(is (= 4 (storm-num-workers state "noniso")))
(.submitTopology cluster "tester1" {TOPOLOGY-WORKERS 6} topology)
(.submitTopology cluster "tester2" {TOPOLOGY-WORKERS 6} topology)
(.advanceClusterTime cluster 11)
(bind task-info-tester1 (storm-component->task-info cluster "tester1"))
(bind task-info-tester2 (storm-component->task-info cluster "tester2"))
(is (= 1 (topology-num-nodes state "noniso")))
(is (= 3 (storm-num-workers state "noniso")))
(is (= {2 3} (topology-node-distribution state "tester1")))
(is (= {3 2} (topology-node-distribution state "tester2")))
(is (apply disjoint? (map (partial topology-nodes state) ["noniso" "tester1" "tester2"])))
(check-consistency cluster "tester1")
(check-consistency cluster "tester2")
(check-consistency cluster "noniso")
;;check that nothing gets reassigned
(bind tester1-slots (topology-slots state "tester1"))
(bind tester2-slots (topology-slots state "tester2"))
(bind noniso-slots (topology-slots state "noniso"))
(.advanceClusterTime cluster 20)
(is (= tester1-slots (topology-slots state "tester1")))
(is (= tester2-slots (topology-slots state "tester2")))
(is (= noniso-slots (topology-slots state "noniso")))
)))
(deftest test-zero-executor-or-tasks
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 6)
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [state (.getClusterState cluster)
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. false) (Integer. 3)
{TOPOLOGY-TASKS 0})}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 1)
{TOPOLOGY-TASKS 2})
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) nil
{TOPOLOGY-TASKS 5})})
_ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
_ (.advanceClusterTime cluster 11)
task-info (storm-component->task-info cluster "mystorm")]
(check-consistency cluster "mystorm")
(is (= 0 (count (task-info "1"))))
(is (= 2 (count (task-info "2"))))
(is (= 5 (count (task-info "3"))))
(is (= 2 (storm-num-workers state "mystorm"))) ;; because only 2 executors
)))
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(deftest test-executor-assignments
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3)
{TOPOLOGY-TASKS 5})}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 8)
{TOPOLOGY-TASKS 2})
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 3))})
_ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
_ (.advanceClusterTime cluster 11)
task-info (storm-component->task-info cluster "mystorm")
executor-info (->> (storm-component->executor-info cluster "mystorm")
(map-val #(map executor->tasks %)))]
(check-consistency cluster "mystorm")
(is (= 5 (count (task-info "1"))))
(check-distribution (executor-info "1") [2 2 1])
(is (= 2 (count (task-info "2"))))
(check-distribution (executor-info "2") [1 1])
(is (= 3 (count (task-info "3"))))
(check-distribution (executor-info "3") [1 1 1])
)))
(deftest test-over-parallelism-assignment
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 2)
(.withPortsPerSupervisor 5)
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [state (.getClusterState cluster)
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 21))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 9))
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 2))
"4" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 10))})
_ (.submitTopology cluster "test" {TOPOLOGY-WORKERS 7} topology)
_ (.advanceClusterTime cluster 11)
task-info (storm-component->task-info cluster "test")]
(check-consistency cluster "test")
(is (= 21 (count (task-info "1"))))
(is (= 9 (count (task-info "2"))))
(is (= 2 (count (task-info "3"))))
(is (= 10 (count (task-info "4"))))
(is (= 7 (storm-num-workers state "test")))
)))
(deftest test-topo-history
(let [group-mapper (Mockito/mock IGroupMappingServiceProvider)]
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 2)
(.withPortsPerSupervisor 5)
(.withGroupMapper group-mapper)
(.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-ADMINS ["admin-user"]
NIMBUS-TASK-TIMEOUT-SECS 30
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0})))]
(.thenReturn (Mockito/when (.getGroups group-mapper (Mockito/anyObject))) #{"alice-group"})
(letlocals
(bind conf (.getDaemonConf cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 4))}
{}))
(bind state (.getClusterState cluster))
; get topology history when there's no topology history
(let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) (System/getProperty "user.name")))))]
(log-message "Checking user " (System/getProperty "user.name") " " hist-topo-ids)
(is (= 0 (count hist-topo-ids))))
(.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
(bind storm-id (.get (.getTopoId state "test")))
(.advanceClusterTime cluster 5)
(is (not-nil? (.stormBase state storm-id nil)))
(is (not-nil? (.assignmentInfo state storm-id nil)))
(.killTopology (.getNimbus cluster) "test")
;; check that storm is deactivated but alive
(is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id nil))))
(is (not-nil? (.assignmentInfo state storm-id nil)))
(.advanceClusterTime cluster 35)
;; kill topology read on group
(.submitTopology cluster "killgrouptest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-GROUPS ["alice-group"]} topology)
(bind storm-id-killgroup (.get (.getTopoId state "killgrouptest")))
(.advanceClusterTime cluster 5)
(is (not-nil? (.stormBase state storm-id-killgroup nil)))
(is (not-nil? (.assignmentInfo state storm-id-killgroup nil)))
(.killTopology (.getNimbus cluster) "killgrouptest")
;; check that storm is deactivated but alive
(is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id-killgroup nil))))
(is (not-nil? (.assignmentInfo state storm-id-killgroup nil)))
(.advanceClusterTime cluster 35)
;; kill topology can't read
(.submitTopology cluster "killnoreadtest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
(bind storm-id-killnoread (.get (.getTopoId state "killnoreadtest")))
(.advanceClusterTime cluster 5)
(is (not-nil? (.stormBase state storm-id-killnoread nil)))
(is (not-nil? (.assignmentInfo state storm-id-killnoread nil)))
(.killTopology (.getNimbus cluster) "killnoreadtest")
;; check that storm is deactivated but alive
(is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id-killnoread nil))))
(is (not-nil? (.assignmentInfo state storm-id-killnoread nil)))
(.advanceClusterTime cluster 35)
;; active topology can read
(.submitTopology cluster "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
(.advanceClusterTime cluster 11)
(bind storm-id2 (.get (.getTopoId state "2test")))
(is (not-nil? (.stormBase state storm-id2 nil)))
(is (not-nil? (.assignmentInfo state storm-id2 nil)))
;; active topology can not read
(.submitTopology cluster "testnoread" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice"]} topology)
(.advanceClusterTime cluster 11)
(bind storm-id3 (.get (.getTopoId state "testnoread")))
(is (not-nil? (.stormBase state storm-id3 nil)))
(is (not-nil? (.assignmentInfo state storm-id3 nil)))
;; active topology can read based on group
(.submitTopology cluster "testreadgroup" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-GROUPS ["alice-group"]} topology)
(.advanceClusterTime cluster 11)
(bind storm-id4 (.get (.getTopoId state "testreadgroup")))
(is (not-nil? (.stormBase state storm-id4 nil)))
(is (not-nil? (.assignmentInfo state storm-id4 nil)))
;; at this point have 1 running, 1 killed topo
(let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) (System/getProperty "user.name")))))]
(log-message "Checking user " (System/getProperty "user.name") " " hist-topo-ids)
(is (= 4 (count hist-topo-ids)))
(is (= storm-id2 (get hist-topo-ids 0)))
(is (= storm-id-killgroup (get hist-topo-ids 1)))
(is (= storm-id (get hist-topo-ids 2)))
(is (= storm-id4 (get hist-topo-ids 3))))
(let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "alice"))))]
(log-message "Checking user alice " hist-topo-ids)
(is (= 5 (count hist-topo-ids)))
(is (= storm-id2 (get hist-topo-ids 0)))
(is (= storm-id-killgroup (get hist-topo-ids 1)))
(is (= storm-id (get hist-topo-ids 2)))
(is (= storm-id3 (get hist-topo-ids 3)))
(is (= storm-id4 (get hist-topo-ids 4))))
(let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "admin-user"))))]
(log-message "Checking user admin-user " hist-topo-ids)
(is (= 6 (count hist-topo-ids)))
(is (= storm-id2 (get hist-topo-ids 0)))
(is (= storm-id-killgroup (get hist-topo-ids 1)))
(is (= storm-id-killnoread (get hist-topo-ids 2)))
(is (= storm-id (get hist-topo-ids 3)))
(is (= storm-id3 (get hist-topo-ids 4)))
(is (= storm-id4 (get hist-topo-ids 5))))
(let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "group-only-user"))))]
(log-message "Checking user group-only-user " hist-topo-ids)
(is (= 2 (count hist-topo-ids)))
(is (= storm-id-killgroup (get hist-topo-ids 0)))
(is (= storm-id4 (get hist-topo-ids 1))))))))
(deftest test-kill-storm
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 2)
(.withPortsPerSupervisor 5)
(.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MS 0
NIMBUS-TASK-TIMEOUT-SECS 30
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind conf (.getDaemonConf cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 14))}
{}))
(bind state (.getClusterState cluster))
(.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
(bind storm-id (.get (.getTopoId state "test")))
(.advanceClusterTime cluster 15)
(is (not-nil? (.stormBase state storm-id nil)))
(is (not-nil? (.assignmentInfo state storm-id nil)))
(.killTopology (.getNimbus cluster) "test")
;; check that storm is deactivated but alive
(is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id nil))))
(is (not-nil? (.assignmentInfo state storm-id nil)))
(.advanceClusterTime cluster 18)
;; check that storm is deactivated but alive
(is (= 1 (count (.heartbeatStorms state))))
(.advanceClusterTime cluster 3)
(is (nil? (.stormBase state storm-id nil)))
(is (nil? (.assignmentInfo state storm-id nil)))
;; cleanup happens on monitoring thread
(.advanceClusterTime cluster 11)
(is (empty? (.heartbeatStorms state)))
;; TODO: check that code on nimbus was cleaned up locally...
(is (thrown? NotAliveException (.killTopology (.getNimbus cluster) "lalala")))
(.submitTopology cluster "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology)
(.advanceClusterTime cluster 11)
(is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} topology)))
(.advanceClusterTime cluster 11)
(bind storm-id (.get (.getTopoId state "2test")))
(is (not-nil? (.stormBase state storm-id nil)))
(.killTopology (.getNimbus cluster) "2test")
(is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} topology)))
(.advanceClusterTime cluster 11)
(is (= 1 (count (.heartbeatStorms state))))
(.advanceClusterTime cluster 6)
(is (nil? (.stormBase state storm-id nil)))
(is (nil? (.assignmentInfo state storm-id nil)))
(.advanceClusterTime cluster 11)
(is (= 0 (count (.heartbeatStorms state))))
(.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (.get (.getTopoId state "test3")))
(.advanceClusterTime cluster 11)
;; this guarantees an immediate kill notification
(.killTopology (.getNimbus cluster) "test3")
(.advanceClusterTime cluster 41)
(is (nil? (.stormBase state storm-id3 nil)))
(is (nil? (.assignmentInfo state storm-id3 nil)))
(is (= 0 (count (.heartbeatStorms state))))
;; this guarantees that monitor thread won't trigger for 10 more seconds
(Time/advanceTimeSecs 11)
(.waitForIdle cluster)
(.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (.get (.getTopoId state "test3")))
(.advanceClusterTime cluster 11)
(bind executor-id (first (topology-executors cluster storm-id3)))
(do-executor-heartbeat cluster storm-id3 executor-id)
(.killTopology (.getNimbus cluster) "test3")
(.advanceClusterTime cluster 6)
(is (= 1 (count (.heartbeatStorms state))))
(.advanceClusterTime cluster 5)
(is (= 0 (count (.heartbeatStorms state))))
;; test kill with opts
(.submitTopology cluster "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} topology)
(.advanceClusterTime cluster 11)
(.killTopologyWithOpts (.getNimbus cluster) "test4" (doto (KillOptions.) (.set_wait_secs 10)))
(bind storm-id4 (.get (.getTopoId state "test4")))
(.advanceClusterTime cluster 9)
(is (not-nil? (.assignmentInfo state storm-id4 nil)))
(.advanceClusterTime cluster 2)
(is (nil? (.assignmentInfo state storm-id4 nil)))
)))
(deftest test-reassignment
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 2)
(.withPortsPerSupervisor 5)
(.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind conf (.getDaemonConf cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 2))}
{}))
(bind state (.getClusterState cluster))
(.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
(.advanceClusterTime cluster 11)
(check-consistency cluster "test")
(bind storm-id (.get (.getTopoId state "test")))
(bind [executor-id1 executor-id2] (topology-executors cluster storm-id))
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(bind _ (log-message "ass1, t0: " (pr-str ass1)))
(bind _ (log-message "ass2, t0: " (pr-str ass2)))
(.advanceClusterTime cluster 30)
(bind _ (log-message "ass1, t30, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t30, pre beat: " (pr-str ass2)))
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)
(bind _ (log-message "ass1, t30, post beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t30, post beat: " (pr-str ass2)))
(.advanceClusterTime cluster 13)
(bind _ (log-message "ass1, t43, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t43, pre beat: " (pr-str ass2)))
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
(do-executor-heartbeat cluster storm-id executor-id1)
(bind _ (log-message "ass1, t43, post beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t43, post beat: " (pr-str ass2)))
(.advanceClusterTime cluster 11)
(bind _ (log-message "ass1, t54, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t54, pre beat: " (pr-str ass2)))
(do-executor-heartbeat cluster storm-id executor-id1)
(bind _ (log-message "ass1, t54, post beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t54, post beat: " (pr-str ass2)))
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(check-consistency cluster "test")
; have to wait an extra 10 seconds because nimbus may not
; resynchronize its heartbeat time till monitor-time secs after
(.advanceClusterTime cluster 11)
(bind _ (log-message "ass1, t65, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t65, pre beat: " (pr-str ass2)))
(do-executor-heartbeat cluster storm-id executor-id1)
(bind _ (log-message "ass1, t65, post beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t65, post beat: " (pr-str ass2)))
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(check-consistency cluster "test")
(.advanceClusterTime cluster 11)
(bind _ (log-message "ass1, t76, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t76, pre beat: " (pr-str ass2)))
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (not= ass2 (executor-assignment cluster storm-id executor-id2)))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(bind _ (log-message "ass1, t76, post beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t76, post beat: " (pr-str ass2)))
(check-consistency cluster "test")
(.advanceClusterTime cluster 31)
(is (not= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2))) ; tests launch timeout
(check-consistency cluster "test")
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind active-supervisor (.get_node ass2))
(.killSupervisor cluster active-supervisor)
(doseq [i (range 12)]
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)
(.advanceClusterTime cluster 10)
)
;; tests that it doesn't reassign executors if they're heartbeating even if supervisor times out
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
(check-consistency cluster "test")
(.advanceClusterTime cluster 30)
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(is (not-nil? ass1))
(is (not-nil? ass2))
(is (not= active-supervisor (.get_node (executor-assignment cluster storm-id executor-id2))))
(is (not= active-supervisor (.get_node (executor-assignment cluster storm-id executor-id1))))
(check-consistency cluster "test")
(doseq [supervisor-id (.supervisors state nil)]
(.killSupervisor cluster supervisor-id))
(.advanceClusterTime cluster 90)
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(is (nil? ass1))
(is (nil? ass2))
(check-consistency cluster "test" :assigned? false)
(.addSupervisor cluster)
(.advanceClusterTime cluster 11)
(check-consistency cluster "test")
)))
(deftest test-reassignment-to-constrained-cluster
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 0)
(.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(.addSupervisor cluster 1 "a")
(.addSupervisor cluster 1 "b")
(bind conf (.getDaemonConf cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 2))}
{}))
(bind state (.getClusterState cluster))
(.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
(.advanceClusterTime cluster 11)
(check-consistency cluster "test")
(bind storm-id (.get (.getTopoId state "test")))
(bind [executor-id1 executor-id2] (topology-executors cluster storm-id))
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(.advanceClusterTime cluster 30)
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)
(.advanceClusterTime cluster 13)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
;; with rpc reporting mode, only heartbeats from killed supervisor will time out
(.killSupervisor cluster (.get_node ass2))
(do-executor-heartbeat cluster storm-id executor-id1)
(.advanceClusterTime cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(.advanceClusterTime cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(.advanceClusterTime cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(.advanceClusterTime cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(check-consistency cluster "test")
(is (= 1 (storm-num-workers state "test")))
)))
(defn check-executor-distribution [slot-executors distribution]
(check-distribution (vals slot-executors) distribution))
(defn check-num-nodes [slot-executors num-nodes]
(let [nodes (->> slot-executors keys (map (fn [np] (.get_node np))) set)]
(is (= num-nodes (count nodes)))
))
(deftest test-reassign-squeezed-topology
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 1)
(.withPortsPerSupervisor 1)
(.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 9))}
{}))
(bind state (.getClusterState cluster))
(.submitTopology cluster "test" {TOPOLOGY-WORKERS 4} topology) ; distribution should be 2, 2, 2, 3 ideally
(.advanceClusterTime cluster 11)
(bind storm-id (.get (.getTopoId state "test")))
(bind slot-executors (slot-assignments cluster storm-id))
(check-executor-distribution slot-executors [9])
(check-consistency cluster "test")
(.addSupervisor cluster 2)
(.advanceClusterTime cluster 11)
(bind slot-executors (slot-assignments cluster storm-id))
(bind executor->start (executor-start-times cluster storm-id))
(check-executor-distribution slot-executors [3 3 3])
(check-consistency cluster "test")
(.addSupervisor cluster 8)
;; this actually works for any time > 0, since zookeeper fires an event causing immediate reassignment
;; doesn't work for time = 0 because it's not waiting for cluster yet, so test might happen before reassignment finishes
(.advanceClusterTime cluster 11)
(bind slot-executors2 (slot-assignments cluster storm-id))
(bind executor->start2 (executor-start-times cluster storm-id))
(check-executor-distribution slot-executors2 [2 2 2 3])
(check-consistency cluster "test")
(bind common (first (Utils/findOne (proxy [IPredicate] []
(test [[k v]] (= 3 (count v)))) slot-executors2)))
(is (not-nil? common))
(is (= (slot-executors2 common) (slot-executors common)))
;; check that start times are changed for everything but the common one
(bind same-executors (slot-executors2 common))
(bind changed-executors (apply concat (vals (dissoc slot-executors2 common))))
(doseq [t same-executors]
(is (= (executor->start t) (executor->start2 t))))
(doseq [t changed-executors]
(is (not= (executor->start t) (executor->start2 t))))
)))
(deftest test-get-owner-resource-summaries
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 1)
(.withPortsPerSupervisor 12)
(.withDaemonConf
{SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
})))]
(letlocals
;;test for 0-topology case
(.advanceClusterTime cluster 11)
(bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil))
(bind summary (first owner-resource-summaries))
(is (nil? summary))
;;test for 1-topology case
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
(.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology)
(.advanceClusterTime cluster 11)
(bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil))
(bind summary (first owner-resource-summaries))
(is (= (.get_total_workers summary) 3))
(is (= (.get_total_executors summary)) 3)
(is (= (.get_total_topologies summary)) 1)
;;test for many-topology case
(bind topology2 (Thrift/buildTopology
{"2" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 4))}
{}))
(bind topology3 (Thrift/buildTopology
{"3" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 5))}
{}))
(.submitTopology cluster
"test2"
{TOPOLOGY-WORKERS 4
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2)
(.submitTopology cluster
"test3"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3)
(.advanceClusterTime cluster 11)
(bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) nil))
(bind summary (first owner-resource-summaries))
(is (= (.get_total_workers summary) 10))
(is (= (.get_total_executors summary)) 12)
(is (= (.get_total_topologies summary)) 3)
;;test for specific owner
(bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) (System/getProperty "user.name")))
(bind summary (first owner-resource-summaries))
(is (= (.get_total_workers summary) 10))
(is (= (.get_total_executors summary)) 12)
(is (= (.get_total_topologies summary)) 3)
;;test for other user
(bind other-user (str "not-" (System/getProperty "user.name")))
(bind owner-resource-summaries (.getOwnerResourceSummaries (.getNimbus cluster) other-user))
(bind summary (first owner-resource-summaries))
(is (= (.get_total_workers summary) 0))
(is (= (.get_total_executors summary)) 0)
(is (= (.get_total_topologies summary)) 0)
)))
(deftest test-rebalance
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 1)
(.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
(bind state (.getClusterState cluster))
(.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 60} topology)
(.advanceClusterTime cluster 11)
(bind storm-id (.get (.getTopoId state "test")))
(.addSupervisor cluster 3)
(.addSupervisor cluster 3)
(.advanceClusterTime cluster 11)
(bind slot-executors (slot-assignments cluster storm-id))
;; check that all workers are on one machine
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 1)
(.rebalance (.getNimbus cluster) "test" (RebalanceOptions.))
(.advanceClusterTime cluster 30)
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 1)
(.advanceClusterTime cluster 30)
(bind slot-executors (slot-assignments cluster storm-id))
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 3)
(is (thrown? InvalidTopologyException
(.rebalance (.getNimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" (int 0)})
))))
)))
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(deftest test-rebalance-change-parallelism
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 4)
(.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 6)
{TOPOLOGY-TASKS 12})}
{}))
(bind state (.getClusterState cluster))
(.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
(.advanceClusterTime cluster 11)
(bind storm-id (.get (.getTopoId state "test")))
(bind checker (fn [distribution]
(check-executor-distribution
(slot-assignments cluster storm-id)
distribution)))
(checker [2 2 2])
(.rebalance (.getNimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_workers (int 6))
))
(.advanceClusterTime cluster 29)
(checker [2 2 2])
(.advanceClusterTime cluster 3)
(checker [1 1 1 1 1 1])
(.rebalance (.getNimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" (int 1)})
))
(.advanceClusterTime cluster 29)
(checker [1 1 1 1 1 1])
(.advanceClusterTime cluster 3)
(checker [1])
(.rebalance (.getNimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" (int 8)})
(.set_num_workers 4)
))
(.advanceClusterTime cluster 32)
(checker [2 2 2 2])
(check-consistency cluster "test")
(bind executor-info (->> (storm-component->executor-info cluster "test")
(map-val #(map executor->tasks %))))
(check-distribution (executor-info "1") [2 2 2 2 1 1 1 1])
)))
(defn check-for-collisions [state]
(log-message "Checking for collision")
(let [assignments (.assignments state nil)]
(log-message "Assignemts: " assignments)
(let [id->node->ports (into {} (for [id assignments
:let [executor->node+port (.get_executor_node_port (.assignmentInfo state id nil))
node+ports (set (.values executor->node+port))
node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [np node+ports] {(.get_node np) [(first (.get_port np))]}))]]
{id node->ports}))
_ (log-message "id->node->ports: " id->node->ports)
all-nodes (apply merge-with (fn [a b]
(let [ret (concat a b)]
(log-message "Can we combine " (pr-str a) " and " (pr-str b) " without collisions? " (apply distinct? ret) " => " (pr-str ret))
(is (apply distinct? ret))
(distinct ret)))
(.values id->node->ports))]
)))
(deftest test-rebalance-constrained-cluster
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withSupervisors 1)
(.withPortsPerSupervisor 4)
(.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
(bind topology2 (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
(bind topology3 (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
(bind state (.getClusterState cluster))
(.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology)
(.submitTopology cluster
"test2"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2)
(.submitTopology cluster
"test3"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3)
(.advanceClusterTime cluster 11)
(check-for-collisions state)
(.rebalance (.getNimbus cluster) "test" (doto (RebalanceOptions.)
(.set_num_workers 4)
(.set_wait_secs 0)
))
(.advanceClusterTime cluster 11)
(check-for-collisions state)
(.advanceClusterTime cluster 30)
(check-for-collisions state)
)))
(deftest test-submit-invalid
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withSimulatedTime)
(.withDaemonConf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
NIMBUS-EXECUTORS-PER-TOPOLOGY 8
NIMBUS-SLOTS-PER-TOPOLOGY 8})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 1)
{TOPOLOGY-TASKS 1})}
{}))
(is (thrown? InvalidTopologyException
(.submitTopology cluster
"test/aaa"
{}
topology)))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 16)
{TOPOLOGY-TASKS 16})}
{}))
(bind state (.getClusterState cluster))
(is (thrown? InvalidTopologyException
(.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3}
topology)))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 5)
{TOPOLOGY-TASKS 5})}
{}))
(is (thrown? InvalidTopologyException
(.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 16}
topology))))))
(deftest test-clean-inbox
"Tests that the inbox correctly cleans jar files."
(with-open [_ (Time$SimulatedTime.)
tmp-path (TmpPath. )]
(let [dir-location (.getPath tmp-path)
dir (File. dir-location)
mk-file (fn [name seconds-ago]
(let [f (File. (str dir-location "/" name))
t (- (Time/currentTimeMillis) (* seconds-ago 1000))]
(FileUtils/touch f)
(.setLastModified f t)))
assert-files-in-dir (fn [compare-file-names]
(let [file-names (map #(.getName %) (file-seq dir))]
(is (= (sort compare-file-names)
(sort (filter #(.endsWith % ".jar") file-names))
))))]
;; Make three files a.jar, b.jar, c.jar.
;; a and b are older than c and should be deleted first.
(Time/advanceTimeSecs 100)
(doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
(apply mk-file fs))
(assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
(Nimbus/cleanInbox dir-location 10)
(assert-files-in-dir ["c.jar"])
;; Cleanit again, c.jar should stay
(Time/advanceTimeSecs 5)
(Nimbus/cleanInbox dir-location 10)
(assert-files-in-dir ["c.jar"])
;; Advance time, clean again, c.jar should be deleted.
(Time/advanceTimeSecs 5)
(Nimbus/cleanInbox dir-location 10)
(assert-files-in-dir [])
)))
(defn wait-for-status [nimbus name status]
(Testing/whileTimeout 5000
(reify Testing$Condition
(exec [this]
(let [topo-summary (first (filter (fn [topo] (= name (.get_name topo))) (.get_topologies (.getClusterInfo nimbus))))
topo-status (if topo-summary (.get_status topo-summary) "NOT-RUNNING")]
(log-message "WAITING FOR "name" TO BE " status " CURRENT " topo-status)
(not= topo-status status))))
(fn [] (Thread/sleep 100))))
(deftest test-leadership
"Tests that leader actions can only be performed by master and non leader fails to perform the same actions."
(with-open [zk (InProcessZookeeper. )]
(with-open [tmp-nimbus-dir (TmpPath.)
_ (MockedZookeeper. (proxy [Zookeeper] []
(zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. ))))]
(let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir}))
(bind ass-backend (LocalAssignmentsBackendFactory/getDefault))
(bind cluster-state (ClusterUtils/mkStormClusterState conf ass-backend (ClusterStateContext.)))
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil cluster-state))
(.launchServer nimbus)
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
(zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. false))))]
(letlocals
(bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf ass-backend (ClusterStateContext.)))
(bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil non-leader-cluster-state))
(.launchServer non-leader-nimbus)
;first we verify that the master nimbus can perform all actions, even with another nimbus present.
(.submitTopology nimbus "t1" nil "{}" topology)
;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
(.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
(wait-for-status nimbus "t1" "ACTIVE")
(.deactivate nimbus "t1")
(.activate nimbus "t1")
(.rebalance nimbus "t1" (RebalanceOptions.))
(.killTopology nimbus "t1")
;now we verify that non master nimbus can not perform any of the actions.
(is (thrown? RuntimeException
(.submitTopology non-leader-nimbus
"failing"
nil
"{}"
topology)))
(is (thrown? RuntimeException
(.killTopology non-leader-nimbus
"t1")))
(is (thrown? RuntimeException
(.activate non-leader-nimbus "t1")))
(is (thrown? RuntimeException
(.deactivate non-leader-nimbus "t1")))
(is (thrown? RuntimeException
(.rebalance non-leader-nimbus "t1" (RebalanceOptions.))))
(.shutdown non-leader-nimbus)
(.disconnect non-leader-cluster-state)
))
(.shutdown nimbus)
(.disconnect cluster-state))))))
(deftest test-nimbus-iface-submitTopologyWithOpts-checks-authorization
(with-open [cluster (.build (doto (LocalCluster$Builder. )
(.withDaemonConf {NIMBUS-AUTHORIZER
"org.apache.storm.security.auth.authorizer.DenyAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
(let [
topology (Thrift/buildTopology {} {})
]
(is (thrown? AuthorizationException
(.submitTopologyWithOpts cluster "mystorm" {} topology
(SubmitOptions. TopologyInitialStatus/INACTIVE))
))
)
)
)
(deftest test-nimbus-iface-methods-check-authorization
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
topology-name "test"
topology-id "test-id"]
(.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id))
(is (thrown? AuthorizationException
(.rebalance nimbus topology-name (RebalanceOptions.))))
(is (thrown? AuthorizationException
(.activate nimbus topology-name)))
(is (thrown? AuthorizationException
(.deactivate nimbus topology-name)))))))
(deftest test-nimbus-check-authorization-params
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
topology-name "test-nimbus-check-autho-params"
topology-id "fake-id"
topology (Thrift/buildTopology {} {})
expected-name topology-name
expected-conf {TOPOLOGY-NAME expected-name
"foo" "bar"}]
(.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id))
(.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) expected-conf)
(.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/anyObject))) nil)
(testing "getTopologyConf calls check-authorization! with the correct parameters."
(let [expected-operation "getTopologyConf"]
(try
(is (= expected-conf
(->> (.getTopologyConf nimbus topology-id)
JSONValue/parse
clojurify-structure)))
(catch NotAliveException e)
(finally
(.checkAuthorization (Mockito/verify nimbus) nil nil "getClusterInfo")
(.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))))))
(testing "getTopology calls check-authorization! with the correct parameters."
(let [expected-operation "getTopology"
common-spy (->>
(proxy [StormCommon] []
(systemTopologyImpl [conf topology] nil))
Mockito/spy)]
(with-open [- (StormCommonInstaller. common-spy)]
(try
(.getTopology nimbus topology-id)
(catch NotAliveException e)
(finally
(.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))
(. (Mockito/verify common-spy)
(systemTopologyImpl (Matchers/any Map)
(Matchers/any))))))))
(testing "getUserTopology calls check-authorization with the correct parameters."
(let [expected-operation "getUserTopology"]
(try
(.getUserTopology nimbus topology-id)
(catch NotAliveException e)
(finally
(.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))
;;One for this time and one for getTopology call
(.readTopology (Mockito/verify tc (Mockito/times 2)) (Mockito/eq topology-id) (Mockito/anyObject))))))))))
(deftest test-check-authorization-getSupervisorPageInfo
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
expected-name "test-nimbus-check-autho-params"
expected-conf {TOPOLOGY-NAME expected-name
TOPOLOGY-WORKERS 1
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
"foo" "bar"}
expected-operation "getTopology"
assignment (doto (Assignment.)
(.set_executor_node_port {[1 1] (NodeInfo. "super1" #{1}),
[2 2] (NodeInfo. "super2" #{2})}))
topology (doto (StormTopology. )
(.set_spouts {})
(.set_bolts {})
(.set_state_spouts {}))
topo-assignment {expected-name assignment}
check-auth-state (atom [])
mock-check-authorization (fn [nimbus storm-name storm-conf operation]
(swap! check-auth-state conj {:nimbus nimbus
:storm-name storm-name
:storm-conf storm-conf
:operation operation}))
all-supervisors (doto (HashMap.)
(.put "super1" (doto (SupervisorInfo.) (.set_hostname "host1") (.set_meta [(long 1234)])
(.set_uptime_secs (long 123)) (.set_meta [1 2 3]) (.set_used_ports []) (.set_resources_map {})))
(.put "super2" (doto (SupervisorInfo.) (.set_hostname "host2") (.set_meta [(long 1234)])
(.set_uptime_secs (long 123)) (.set_meta [1 2 3]) (.set_used_ports []) (.set_resources_map {}))))]
(.thenReturn (Mockito/when (.allSupervisorInfo cluster-state)) all-supervisors)
(.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/any Subject))) expected-conf)
(.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/any Subject))) topology)
(.thenReturn (Mockito/when (.assignmentsInfo cluster-state)) topo-assignment)
(.getSupervisorPageInfo nimbus "super1" nil true)
;; afterwards, it should get called twice
(.checkAuthorization (Mockito/verify nimbus) (Mockito/eq expected-name) (Mockito/any Map) (Mockito/eq "getSupervisorPageInfo"))
(.checkAuthorization (Mockito/verify nimbus) nil nil "getClusterInfo")
(.checkAuthorization (Mockito/verify nimbus) (Mockito/eq expected-name) (Mockito/any Map) (Mockito/eq "getTopology"))))))
(deftest test-nimbus-iface-getTopology-methods-throw-correctly
(with-open [cluster (LocalCluster. )]
(let [
nimbus (.getNimbus cluster)
id "bogus ID"
]
(is (thrown? NotAliveException (.getTopology nimbus id)))
(try
(.getTopology nimbus id)
(catch NotAliveException e
(is (= id (.get_msg e)))
)
)
(is (thrown? NotAliveException (.getTopologyConf nimbus id)))
(try (.getTopologyConf nimbus id)
(catch NotAliveException e
(is (= id (.get_msg e)))
)
)
(is (thrown? NotAliveException (.getTopologyInfo nimbus id)))
(try (.getTopologyInfo nimbus id)
(catch NotAliveException e
(is (= id (.get_msg e)))
)
)
(is (thrown? NotAliveException (.getUserTopology nimbus id)))
(try (.getUserTopology nimbus id)
(catch NotAliveException e
(is (= id (.get_msg e)))
)
)
)
)
)
(defn mkStormBase [launch-time-secs storm-name status]
(doto (StormBase.)
(.set_name storm-name)
(.set_launch_time_secs (int launch-time-secs))
(.set_status status)))
(deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withTopoCache tc)
(.withBlobStore blob-store)))]
(let [nimbus (.getNimbus cluster)
bogus-secs 42
bogus-type TopologyStatus/ACTIVE
bogus-bases {
"1" nil
"2" (mkStormBase bogus-secs "id2-name" bogus-type)
"3" nil
"4" (mkStormBase bogus-secs "id4-name" bogus-type)
}
topo-name "test-topo"
topo-conf {TOPOLOGY-NAME topo-name
TOPOLOGY-WORKERS 1
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
storm-base (StormBase. )
topology (doto (StormTopology. )
(.set_spouts {})
(.set_bolts {})
(.set_state_spouts {}))
]
(.thenReturn (Mockito/when (.stormBase cluster-state (Mockito/any String) (Mockito/anyObject))) storm-base)
(.thenReturn (Mockito/when (.topologyBases cluster-state)) bogus-bases)
(.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/any Subject))) topo-conf)
(.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/any Subject))) topology)
(let [topos (.get_topologies (.getClusterInfo nimbus))]
; The number of topologies in the summary is correct.
(is (= (count
(filter (fn [b] (second b)) bogus-bases)) (count topos)))
; Each topology present has a valid name.
(is (empty?
(filter (fn [t] (or (nil? t) (nil? (.get_name t)))) topos)))
; The topologies are those with valid bases.
(is (empty?
(filter (fn [t]
(or
(nil? t)
(not (number? (read-string (.get_id t))))
(odd? (read-string (.get_id t)))
)) topos)))
)
)
)
))
(deftest test-file-bogus-download
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(let [nimbus (.getNimbus cluster)]
(is (thrown-cause? IllegalArgumentException (.beginFileDownload nimbus nil)))
(is (thrown-cause? IllegalArgumentException (.beginFileDownload nimbus "")))
(is (thrown-cause? IllegalArgumentException (.beginFileDownload nimbus "/bogus-path/foo")))
)))
(deftest test-validate-topo-config-on-submit
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
(let [topology (Thrift/buildTopology {} {})
bad-config {"topology.isolate.machines" "2"}]
(is (thrown-cause? InvalidTopologyException
(.submitTopologyWithOpts cluster "test" bad-config topology
(SubmitOptions.))))))))
(deftest test-stateless-with-scheduled-topology-to-be-killed
; tests regression of STORM-856
(with-open [zk (InProcessZookeeper. )]
(with-open [tmp-nimbus-dir (TmpPath. )]
(let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir}))
(bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
(.launchServer nimbus)
(Time/sleepSecs 1)
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
(.submitTopology nimbus "t1" nil (str "{\"" TOPOLOGY-MESSAGE-TIMEOUT-SECS "\": 30}") topology)
; make transition for topology t1 to be killed -> nimbus applies this event to cluster state
(.killTopology nimbus "t1")
; shutdown nimbus immediately to achieve nimbus doesn't handle event right now
(.shutdown nimbus)
; in startup of nimbus it reads cluster state and take proper actions
; in this case nimbus registers topology transition event to scheduler again
; before applying STORM-856 nimbus was killed with NPE
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
(.launchServer nimbus)
(.shutdown nimbus)
(.disconnect cluster-state)
)))))
(deftest test-topology-action-notifier
(with-open [zk (InProcessZookeeper. )]
(with-open [tmp-nimbus-dir (TmpPath.)
_ (MockedZookeeper. (proxy [Zookeeper] []
(zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. ))))]
(let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir
NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)}))
(bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
(.launchServer nimbus)
(bind notifier (InMemoryTopologyActionNotifier.))
(Time/sleepSecs 1)
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
(.submitTopology nimbus "test-notification" nil (str "{\"" TOPOLOGY-MESSAGE-TIMEOUT-SECS "\": 30}") topology)
(.deactivate nimbus "test-notification")
(.activate nimbus "test-notification")
(.rebalance nimbus "test-notification" (doto (RebalanceOptions.)
(.set_wait_secs 0)))
(.killTopologyWithOpts nimbus "test-notification" (doto (KillOptions.)
(.set_wait_secs 0)))
(.shutdown nimbus)
; ensure notifier was invoked for each action,and in the correct order.
(is (= ["submitTopology", "activate", "deactivate", "activate", "rebalance", "killTopology"]
(.getTopologyActions notifier "test-notification")))
(.disconnect cluster-state)
)))))
(deftest test-debug-on-component
(with-open [cluster (LocalCluster. )]
(let [nimbus (.getNimbus cluster)
topology (Thrift/buildTopology
{"spout" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{})]
(.submitTopology cluster "t1" {TOPOLOGY-WORKERS 1} topology)
(.debug nimbus "t1" "spout" true 100))))
(deftest test-debug-on-global
(with-open [cluster (LocalCluster. )]
(let [nimbus (.getNimbus cluster)
topology (Thrift/buildTopology
{"spout" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{})]
(.submitTopology cluster "t1" {TOPOLOGY-WORKERS 1} topology)
(.debug nimbus "t1" "" true 100))))
;; if the user sends an empty log config, nimbus will say that all
;; log configs it contains are LogLevelAction/UNCHANGED
(deftest empty-save-config-results-in-all-unchanged-actions
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
previous-config (LogConfig.)
mock-config (LogConfig.)
expected-config (LogConfig.)]
;; send something with content to nimbus beforehand
(.put_to_named_logger_level previous-config "test"
(doto (LogLevel.)
(.set_target_log_level "ERROR")
(.set_action LogLevelAction/UPDATE)))
(.put_to_named_logger_level expected-config "test"
(doto (LogLevel.)
(.set_target_log_level "ERROR")
(.set_action LogLevelAction/UNCHANGED)))
(.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
(.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config)
(.setLogConfig nimbus "foo" mock-config)
(.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any String) (Mockito/eq expected-config) (Mockito/any Map))))))
(deftest log-level-update-merges-and-flags-existent-log-level
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
previous-config (LogConfig.)
mock-config (LogConfig.)
expected-config (LogConfig.)]
;; send something with content to nimbus beforehand
(.put_to_named_logger_level previous-config "test"
(doto (LogLevel.)
(.set_target_log_level "ERROR")
(.set_action LogLevelAction/UPDATE)))
(.put_to_named_logger_level previous-config "other-test"
(doto (LogLevel.)
(.set_target_log_level "DEBUG")
(.set_action LogLevelAction/UPDATE)))
;; only change "test"
(.put_to_named_logger_level mock-config "test"
(doto (LogLevel.)
(.set_target_log_level "INFO")
(.set_action LogLevelAction/UPDATE)))
(.put_to_named_logger_level expected-config "test"
(doto (LogLevel.)
(.set_target_log_level "INFO")
(.set_action LogLevelAction/UPDATE)))
(.put_to_named_logger_level expected-config "other-test"
(doto (LogLevel.)
(.set_target_log_level "DEBUG")
(.set_action LogLevelAction/UNCHANGED)))
(.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
(.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config)
(.setLogConfig nimbus "foo" mock-config)
(.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any String) (Mockito/eq expected-config) (Mockito/any Map))))))
(defn mock-cluster-state
([]
(mock-cluster-state nil nil))
([active-topos inactive-topos]
(mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos inactive-topos))
([active-topos hb-topos error-topos bp-topos]
(mock-cluster-state active-topos hb-topos error-topos bp-topos nil))
([active-topos hb-topos error-topos bp-topos wt-topos]
(let [cluster-state (Mockito/mock IStormClusterState)]
(.thenReturn (Mockito/when (.activeStorms cluster-state)) active-topos)
(.thenReturn (Mockito/when (.heartbeatStorms cluster-state)) hb-topos)
(.thenReturn (Mockito/when (.errorTopologies cluster-state)) error-topos)
(.thenReturn (Mockito/when (.backpressureTopologies cluster-state)) bp-topos)
(.thenReturn (Mockito/when (.idsOfTopologiesWithPrivateWorkerKeys cluster-state)) (into #{} wt-topos))
cluster-state)))
(deftest cleanup-storm-ids-returns-inactive-topos
(let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3"))
store (Mockito/mock BlobStore)]
(.thenReturn (Mockito/when (.storedTopoIds store)) #{})
(is (= (Nimbus/topoIdsToClean mock-state store {NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MS 0}) #{"topo2" "topo3"}))))
(deftest cleanup-storm-ids-performs-union-of-storm-ids-with-active-znodes
(let [active-topos (list "hb1" "e2" "bp3")
hb-topos (list "hb1" "hb2" "hb3")
error-topos (list "e1" "e2" "e3")
bp-topos (list "bp1" "bp2" "bp3")
mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)
store (Mockito/mock BlobStore)]
(.thenReturn (Mockito/when (.storedTopoIds store)) #{})
(is (= (Nimbus/topoIdsToClean mock-state store {NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MS 0})
#{"hb2" "hb3" "e1" "e3" "bp1" "bp2"}))))
(deftest cleanup-storm-ids-returns-empty-set-when-all-topos-are-active
(let [active-topos (list "hb1" "hb2" "hb3" "e1" "e2" "e3" "bp1" "bp2" "bp3")
hb-topos (list "hb1" "hb2" "hb3")
error-topos (list "e1" "e2" "e3")
bp-topos (list "bp1" "bp2" "bp3")
mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)
store (Mockito/mock BlobStore)]
(.thenReturn (Mockito/when (.storedTopoIds store)) #{})
(is (= (Nimbus/topoIdsToClean mock-state store (new java.util.HashMap))
#{}))))
(deftest do-cleanup-removes-inactive-znodes
(let [inactive-topos (list "topo2" "topo3")
mock-state (mock-cluster-state)
mock-blob-store (Mockito/mock BlobStore)
conf {NIMBUS-MONITOR-FREQ-SECS 10 NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MS 0}]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
(zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. ))))]
(let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil (StormMetricsRegistry.)))]
(.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo2")
(.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo3")
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos))
(.doCleanup nimbus)
;; removed heartbeats znode
(.teardownHeartbeats (Mockito/verify mock-state) "topo2")
(.teardownHeartbeats (Mockito/verify mock-state) "topo3")
;; removed topo errors znode
(.teardownTopologyErrors (Mockito/verify mock-state) "topo2")
(.teardownTopologyErrors (Mockito/verify mock-state) "topo3")
;; removed topo directories
(.forceDeleteTopoDistDir (Mockito/verify nimbus) "topo2")
(.forceDeleteTopoDistDir (Mockito/verify nimbus) "topo3")
;; removed blob store topo keys
(.rmTopologyKeys (Mockito/verify nimbus) "topo2")
(.rmTopologyKeys (Mockito/verify nimbus) "topo3")
;; removed topology dependencies
(.rmDependencyJarsInTopology (Mockito/verify nimbus) "topo2")
(.rmDependencyJarsInTopology (Mockito/verify nimbus) "topo3")
;; remove topos from heartbeat cache
(is (= (.getNumToposCached (.getHeartbeatsCache nimbus)) 0))))))
(deftest do-cleanup-does-not-teardown-active-topos
(let [inactive-topos ()
mock-state (mock-cluster-state)
mock-blob-store (Mockito/mock BlobStore)
conf {NIMBUS-MONITOR-FREQ-SECS 10}]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
(zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. ))))]
(let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil (StormMetricsRegistry.)))]
(.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo1")
(.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo2")
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos))
(.doCleanup nimbus)
(.teardownHeartbeats (Mockito/verify mock-state (Mockito/never)) (Mockito/any))
(.teardownTopologyErrors (Mockito/verify mock-state (Mockito/never)) (Mockito/any))
(.forceDeleteTopoDistDir (Mockito/verify nimbus (Mockito/times 0)) (Mockito/anyObject))
(.rmTopologyKeys (Mockito/verify nimbus (Mockito/times 0)) (Mockito/anyObject))
;; hb-cache goes down to 1 because only one topo was inactive
(is (= (.getNumToposCached (.getHeartbeatsCache nimbus)) 2))
(is (contains? (.getTopologyIds (.getHeartbeatsCache nimbus)) "topo1"))
(is (contains? (.getTopologyIds (.getHeartbeatsCache nimbus)) "topo2"))))))
(deftest user-topologies-for-supervisor
(let [assignment (doto (Assignment.)
(.set_executor_node_port {[1 1] (NodeInfo. "super1" #{1}),
[2 2] (NodeInfo. "super2" #{2})}))
assignment2 (doto (Assignment.)
(.set_executor_node_port {[1 1] (NodeInfo. "super2" #{2}),
[2 2] (NodeInfo. "super2" #{2})}))
assignments {"topo1" assignment, "topo2" assignment2}
mock-state (mock-cluster-state)
mock-blob-store (Mockito/mock BlobStore)
mock-tc (Mockito/mock TopoCache)
nimbus (Nimbus. {NIMBUS-MONITOR-FREQ-SECS 10} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil (StormMetricsRegistry.))]
(let [supervisor1-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1"))
user1-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor1-topologies))
supervisor2-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super2"))
user2-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor2-topologies))]
(is (= (list "topo1") supervisor1-topologies))
(is (= #{"topo1"} user1-topologies))
(is (= (list "topo1" "topo2") supervisor2-topologies))
(is (= #{"topo1" "topo2"} user2-topologies)))))
(deftest user-topologies-for-supervisor-with-unauthorized-user
(let [assignment (doto (Assignment.)
(.set_executor_node_port {[1 1] (NodeInfo. "super1" #{1}),
[2 2] (NodeInfo. "super2" #{2})}))
assignment2 (doto (Assignment.)
(.set_executor_node_port {[1 1] (NodeInfo. "super1" #{2}),
[2 2] (NodeInfo. "super2" #{2})}))
assignments {"topo1" assignment, "authorized" assignment2}
mock-state (mock-cluster-state)
mock-blob-store (Mockito/mock BlobStore)
mock-tc (Mockito/mock TopoCache)
nimbus (Nimbus. {NIMBUS-MONITOR-FREQ-SECS 10} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil (StormMetricsRegistry.))]
(.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"})
(.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "topo1") (Mockito/anyObject))) {TOPOLOGY-NAME "topo1"})
(.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME)))))
(let [supervisor-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1"))
user-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor-topologies))]
(is (= (list "topo1" "authorized") supervisor-topologies))
(is (= #{"authorized"} user-topologies)))))