blob: d5588d13a500a436a77caf46eb59430c4b4bf9d2 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.cluster-test
(:import [java.util Arrays]
[org.apache.storm.nimbus NimbusInfo])
(:import [org.apache.storm.generated SupervisorInfo StormBase Assignment NimbusSummary TopologyStatus NodeInfo Credentials])
(:import [org.apache.storm.shade.org.apache.zookeeper ZooDefs ZooDefs$Ids Watcher$Event$EventType])
(:import [org.mockito Mockito])
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.storm.shade.org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
(:import [org.apache.storm.utils Time Time$SimulatedTime ZookeeperAuthInfo ConfigUtils Utils CuratorUtils])
(:import [org.apache.storm.cluster IStateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
(:import [org.apache.storm.zookeeper Zookeeper ClientZookeeper])
(:import [org.apache.storm.callback ZKStateChangedCallback])
(:import [org.apache.storm.testing InProcessZookeeper])
(:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster MockedClientZookeeper])
(:require [conjure.core])
(:use [conjure core])
(:use [clojure test])
(:use [org.apache.storm config util log]))
(defn mk-config [zk-port]
(merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-PORT zk-port
STORM-ZOOKEEPER-SERVERS ["localhost"]}))
(defn mk-state
([zk-port] (let [conf (mk-config zk-port)]
(ClusterUtils/mkStateStorage conf conf (ClusterStateContext.))))
([zk-port cb]
(let [ret (mk-state zk-port)]
(.register ret cb)
ret)))
(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) (ClusterStateContext.)))
(defn barr
[& vals]
(byte-array (map byte vals)))
(deftest test-basics
(with-open [zk (InProcessZookeeper. )]
(let [state (mk-state (.getPort zk))]
(.set-data state "/root" (barr 1 2 3) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1 2 3) (.get-data state "/root" false)))
(is (= nil (.get-data state "/a" false)))
(.set-data state "/root/a" (barr 1 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(.set-data state "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1) (.get-data state "/root" false)))
(is (Arrays/equals (barr 1 2) (.get-data state "/root/a" false)))
(.set-data state "/a/b/c/d" (barr 99) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 99) (.get-data state "/a/b/c/d" false)))
(.mkdirs state "/lalala" ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= [] (.get-children state "/lalala" false)))
(is (= #{"root" "a" "lalala"} (set (.get-children state "/" false))))
(.delete-node state "/a")
(is (= #{"root" "lalala"} (set (.get-children state "/" false))))
(is (= nil (.get-data state "/a/b/c/d" false)))
(.close state)
)))
(deftest test-multi-state
(with-open [zk (InProcessZookeeper. )]
(let [state1 (mk-state (.getPort zk))
state2 (mk-state (.getPort zk))]
(.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1) (.get-data state1 "/root" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/root" false)))
(.delete-node state2 "/root")
(is (= nil (.get-data state1 "/root" false)))
(is (= nil (.get-data state2 "/root" false)))
(.close state1)
(.close state2)
)))
(deftest test-ephemeral
(with-open [zk (InProcessZookeeper. )]
(let [state1 (mk-state (.getPort zk))
state2 (mk-state (.getPort zk))
state3 (mk-state (.getPort zk))]
(.set-ephemeral-node state1 "/a" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1) (.get-data state1 "/a" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
(.close state3)
(is (Arrays/equals (barr 1) (.get-data state1 "/a" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
(.close state1)
(is (= nil (.get-data state2 "/a" false)))
(.close state2)
)))
(defn mk-callback-tester []
(let [last (atom nil)
cb (reify
ZKStateChangedCallback
(changed
[this type path]
(reset! last {:type type :path path})))]
[last cb]
))
(defn read-and-reset! [aatom]
(let [time (System/currentTimeMillis)]
(loop []
(if-let [val @aatom]
(do
(reset! aatom nil)
val)
(do
(when (> (- (System/currentTimeMillis) time) 30000)
(throw (RuntimeException. "Waited too long for atom to change state")))
(Thread/sleep 10)
(recur))
))))
(deftest test-callbacks
(with-open [zk (InProcessZookeeper. )]
(let [[state1-last-cb state1-cb] (mk-callback-tester)
state1 (mk-state (.getPort zk) state1-cb)
[state2-last-cb state2-cb] (mk-callback-tester)
state2 (mk-state (.getPort zk) state2-cb)]
(.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(.get-data state2 "/root" true)
(is (= nil @state1-last-cb))
(is (= nil @state2-last-cb))
(.set-data state2 "/root" (barr 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type Watcher$Event$EventType/NodeDataChanged :path "/root"} (read-and-reset! state2-last-cb)))
(is (= nil @state1-last-cb))
(.set-data state2 "/root" (barr 3) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= nil @state2-last-cb))
(.get-data state2 "/root" true)
(.get-data state2 "/root" false)
(.delete-node state1 "/root")
(is (= {:type Watcher$Event$EventType/NodeDeleted :path "/root"} (read-and-reset! state2-last-cb)))
(.get-data state2 "/root" true)
(.set-ephemeral-node state1 "/root" (barr 1 2 3 4) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type Watcher$Event$EventType/NodeCreated :path "/root"} (read-and-reset! state2-last-cb)))
(.get-children state1 "/" true)
(.set-data state2 "/a" (barr 9) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= nil @state2-last-cb))
(is (= {:type Watcher$Event$EventType/NodeChildrenChanged :path "/"} (read-and-reset! state1-last-cb)))
(.get-data state2 "/root" true)
(.set-ephemeral-node state1 "/root" (barr 1 2) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type Watcher$Event$EventType/NodeDataChanged :path "/root"} (read-and-reset! state2-last-cb)))
(.mkdirs state1 "/ccc" ZooDefs$Ids/OPEN_ACL_UNSAFE)
(.get-children state1 "/ccc" true)
(.get-data state2 "/ccc/b" true)
(.set-data state2 "/ccc/b" (barr 8) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type Watcher$Event$EventType/NodeCreated :path "/ccc/b"} (read-and-reset! state2-last-cb)))
(is (= {:type Watcher$Event$EventType/NodeChildrenChanged :path "/ccc"} (read-and-reset! state1-last-cb)))
(.get-data state2 "/root" true)
(.get-data state2 "/root2" true)
(.close state1)
(is (= {:type Watcher$Event$EventType/NodeDeleted :path "/root"} (read-and-reset! state2-last-cb)))
(.set-data state2 "/root2" (barr 9) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (= {:type Watcher$Event$EventType/NodeCreated :path "/root2"} (read-and-reset! state2-last-cb)))
(.close state2)
)))
(defn mkAssignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources]
(doto (Assignment.)
(.set_executor_node_port executor->node+port)
(.set_executor_start_time_secs executor->start-time-secs)
(.set_worker_resources worker->resources)
(.set_node_host node->host)
(.set_master_code_dir master-code-dir)))
(defn mkStormBase [storm-name launch-time-secs status num-workers]
(doto (StormBase.)
(.set_name storm-name)
(.set_launch_time_secs (int launch-time-secs))
(.set_status status)
(.set_num_workers (int num-workers))))
(deftest test-storm-cluster-state-basics
(with-open [zk (InProcessZookeeper. )]
(let [state (mk-storm-state (.getPort zk))
assignment1 (mkAssignment "/aaa" {} {[1] (NodeInfo. "1" #{1001 1})} {} {})
assignment2 (mkAssignment "/aaa" {} {[2] (NodeInfo. "2" #{2002})} {} {})
nimbusInfo1 (NimbusInfo. "nimbus1" 6667 false)
nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false)
nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (Time/currentTimeSecs) false "v1")
nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (Time/currentTimeSecs) false "v2")
base1 (mkStormBase "/tmp/storm1" 1 TopologyStatus/ACTIVE 2)
base2 (mkStormBase "/tmp/storm2" 2 TopologyStatus/ACTIVE 2)]
(is (= [] (.assignments state nil)))
(.setAssignment state "storm1" assignment1 {})
(is (= assignment1 (.assignmentInfo state "storm1" nil)))
(is (= nil (.assignmentInfo state "storm3" nil)))
(.setAssignment state "storm1" assignment2 {})
(.setAssignment state "storm3" assignment1 {})
(is (= #{"storm1" "storm3"} (set (.assignments state nil))))
(is (= assignment2 (.assignmentInfo state "storm1" nil)))
(is (= assignment1 (.assignmentInfo state "storm3" nil)))
(is (= [] (.activeStorms state)))
(.activateStorm state "storm1" base1 {})
(is (= ["storm1"] (.activeStorms state)))
(is (= base1 (.stormBase state "storm1" nil)))
(is (= nil (.stormBase state "storm2" nil)))
(.activateStorm state "storm2" base2 {})
(is (= base1 (.stormBase state "storm1" nil)))
(is (= base2 (.stormBase state "storm2" nil)))
(is (= #{"storm1" "storm2"} (set (.activeStorms state))))
(.removeStormBase state "storm1")
(is (= base2 (.stormBase state "storm2" nil)))
(is (= #{"storm2"} (set (.activeStorms state))))
(is (nil? (.credentials state "storm1" nil)))
(.setCredentials state "storm1" (doto (Credentials. ) (.set_creds {"a" "a"})) {})
(is (= {"a" "a"} (.get_creds (.credentials state "storm1" nil))))
(.setCredentials state "storm1" (doto (Credentials. ) (.set_creds {"b" "b"})) {})
(is (= {"b" "b"} (.get_creds (.credentials state "storm1" nil))))
(is (= [] (.blobstoreInfo state "")))
(.setupBlob state "key1" nimbusInfo1 (Integer/parseInt "1"))
(is (= ["key1"] (.blobstoreInfo state "")))
(is (= [(str (.toHostPortString nimbusInfo1) "-1")] (.blobstoreInfo state "key1")))
(.setupBlob state "key1" nimbusInfo2 (Integer/parseInt "1"))
(is (= #{(str (.toHostPortString nimbusInfo1) "-1")
(str (.toHostPortString nimbusInfo2) "-1")} (set (.blobstoreInfo state "key1"))))
(.removeBlobstoreKey state "key1")
(is (= [] (.blobstoreInfo state "")))
(is (= [] (.nimbuses state)))
(.addNimbusHost state "nimbus1:port" nimbusSummary1)
(is (= [nimbusSummary1] (.nimbuses state)))
(.addNimbusHost state "nimbus2:port" nimbusSummary2)
(is (= #{nimbusSummary1 nimbusSummary2} (set (.nimbuses state))))
;; TODO add tests for task info and task heartbeat setting and getting
(.disconnect state)
)))
(defn- validate-errors! [state storm-id component errors-list]
(let [errors (map clojurify-error (.errors state storm-id component))]
(is (= (count errors) (count errors-list)))
(doseq [[error target] (map vector errors errors-list)]
(when-not (.contains (:error error) target)
(println target " => " (:error error)))
(is (.contains (:error error) target))
)))
(defn- stringify-error [error]
(let [result (java.io.StringWriter.)
printer (java.io.PrintWriter. result)]
(.printStackTrace error printer)
(.toString result)))
(deftest test-storm-cluster-state-errors
(with-open [zk (InProcessZookeeper. )]
(with-open [_ (Time$SimulatedTime. )]
(let [state (mk-storm-state (.getPort zk))]
(.reportError state "a" "1" (Utils/localHostname) 6700 (RuntimeException.))
(validate-errors! state "a" "1" ["RuntimeException"])
(Time/advanceTimeSecs 1)
(.reportError state "a" "1" (Utils/localHostname) 6700 (IllegalArgumentException.))
(validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"])
(doseq [i (range 10)]
(.reportError state "a" "2" (Utils/localHostname) 6700 (RuntimeException.))
(Time/advanceTimeSecs 2))
(validate-errors! state "a" "2" (repeat 10 "RuntimeException"))
(doseq [i (range 5)]
(.reportError state "a" "2" (Utils/localHostname) 6700 (IllegalArgumentException.))
(Time/advanceTimeSecs 2))
(validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException")
(repeat 5 "RuntimeException")
))
(.disconnect state)
))))
(defn mkSupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map]
(doto (SupervisorInfo.)
(.set_time_secs time-secs)
(.set_hostname hostname)
(.set_assignment_id assignment-id)
(.set_used_ports used-ports)
(.set_meta meta)
(.set_scheduler_meta scheduler-meta)
(.set_uptime_secs uptime-secs)
(.set_version version)
(.set_resources_map resources-map)))
(deftest test-supervisor-state
(with-open [zk (InProcessZookeeper. )]
(let [state1 (mk-storm-state (.getPort zk))
state2 (mk-storm-state (.getPort zk))
supervisor-info1 (mkSupervisorInfo 10 "hostname-1" "id1" [1 2] [] {} 1000 "0.9.2" nil)
supervisor-info2 (mkSupervisorInfo 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)]
(is (= [] (.supervisors state1 nil)))
(.supervisorHeartbeat state2 "2" supervisor-info2)
(.supervisorHeartbeat state1 "1" supervisor-info1)
(is (= supervisor-info2 (.supervisorInfo state1 "2")))
(is (= supervisor-info1 (.supervisorInfo state1 "1")))
(is (= #{"1" "2"} (set (.supervisors state1 nil))))
(is (= #{"1" "2"} (set (.supervisors state2 nil))))
(.disconnect state2)
(is (= #{"1"} (set (.supervisors state1 nil))))
(.disconnect state1)
)))
(deftest test-cluster-authentication
(with-open [zk (InProcessZookeeper. )]
(let [builder (Mockito/mock CuratorFrameworkFactory$Builder)
conf (merge
(mk-config (.getPort zk))
{STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
STORM-ZOOKEEPER-SESSION-TIMEOUT 10
STORM-ZOOKEEPER-RETRY-INTERVAL 5
STORM-ZOOKEEPER-RETRY-TIMES 2
STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15
STORM-ZOOKEEPER-AUTH-SCHEME "digest"
STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})]
(. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
(. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
(. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
(CuratorUtils/testSetupBuilder builder (str (.getPort zk) "/") conf (ZookeeperAuthInfo. conf))
(is (nil?
(try
(. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
(catch MockitoAssertionError e
e)))))))
(deftest test-storm-state-callbacks
;; TODO finish
)
(deftest test-cluster-state-default-acls
(testing "The default ACLs are empty."
(let [zk-mock (Mockito/mock ClientZookeeper)
curator-frameworke (reify CuratorFramework (^void close [this] nil))]
;; No need for when clauses because we just want to return nil
(with-open [_ (MockedClientZookeeper. zk-mock)]
(. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/any) (Mockito/any) (Mockito/anyString) (Mockito/any)
(Mockito/any) (Mockito/any))) (thenReturn curator-frameworke))
(ClusterUtils/mkStateStorage {} nil (ClusterStateContext.))
(.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))
(let [distributed-state-storage (reify IStateStorage
(register [this callback] nil)
(mkdirs [this path acls] nil))
cluster-utils (Mockito/mock ClusterUtils)]
(with-open [mocked-cluster (MockedCluster. cluster-utils)]
(. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/any))) (thenReturn distributed-state-storage))
(ClusterUtils/mkStormClusterState {} (ClusterStateContext.))))))