blob: 63efd30f4c76e2e5298c81bef8f43246f5b48305 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.cluster-test
(:import [java.util Arrays])
(:import [backtype.storm.daemon.common Assignment StormBase SupervisorInfo])
(:use [clojure test])
(:use [backtype.storm cluster config util testing]))
(defn mk-config [zk-port]
(merge (read-storm-config)
(defn mk-state
([zk-port] (mk-distributed-cluster-state (mk-config zk-port)))
([zk-port cb]
(let [ret (mk-state zk-port)]
(.register ret cb)
ret )))
(defn mk-storm-state [zk-port] (mk-storm-cluster-state (mk-config zk-port)))
(deftest test-basics
(with-inprocess-zookeeper zk-port
(let [state (mk-state zk-port)]
(.set-data state "/root" (barr 1 2 3))
(is (Arrays/equals (barr 1 2 3) (.get-data state "/root" false)))
(is (= nil (.get-data state "/a" false)))
(.set-data state "/root/a" (barr 1 2))
(.set-data state "/root" (barr 1))
(is (Arrays/equals (barr 1) (.get-data state "/root" false)))
(is (Arrays/equals (barr 1 2) (.get-data state "/root/a" false)))
(.set-data state "/a/b/c/d" (barr 99))
(is (Arrays/equals (barr 99) (.get-data state "/a/b/c/d" false)))
(.mkdirs state "/lalala")
(is (= [] (.get-children state "/lalala" false)))
(is (= #{"root" "a" "lalala"} (set (.get-children state "/" false))))
(.delete-node state "/a")
(is (= #{"root" "lalala"} (set (.get-children state "/" false))))
(is (= nil (.get-data state "/a/b/c/d" false)))
(.close state)
(deftest test-multi-state
(with-inprocess-zookeeper zk-port
(let [state1 (mk-state zk-port)
state2 (mk-state zk-port)]
(.set-data state1 "/root" (barr 1))
(is (Arrays/equals (barr 1) (.get-data state1 "/root" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/root" false)))
(.delete-node state2 "/root")
(is (= nil (.get-data state1 "/root" false)))
(is (= nil (.get-data state2 "/root" false)))
(.close state1)
(.close state2)
(deftest test-ephemeral
(with-inprocess-zookeeper zk-port
(let [state1 (mk-state zk-port)
state2 (mk-state zk-port)
state3 (mk-state zk-port)]
(.set-ephemeral-node state1 "/a" (barr 1))
(is (Arrays/equals (barr 1) (.get-data state1 "/a" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
(.close state3)
(is (Arrays/equals (barr 1) (.get-data state1 "/a" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
(.close state1)
(is (= nil (.get-data state2 "/a" false)))
(.close state2)
(defn mk-callback-tester []
(let [last (atom nil)
cb (fn [type path]
(reset! last {:type type :path path}))]
[last cb]
(defn read-and-reset! [aatom]
(let [time (System/currentTimeMillis)]
(loop []
(if-let [val @aatom]
(reset! aatom nil)
(when (> (- (System/currentTimeMillis) time) 30000)
(throw (RuntimeException. "Waited too long for atom to change state")))
(Thread/sleep 10)
(deftest test-callbacks
(with-inprocess-zookeeper zk-port
(let [[state1-last-cb state1-cb] (mk-callback-tester)
state1 (mk-state zk-port state1-cb)
[state2-last-cb state2-cb] (mk-callback-tester)
state2 (mk-state zk-port state2-cb)]
(.set-data state1 "/root" (barr 1))
(.get-data state2 "/root" true)
(is (= nil @state1-last-cb))
(is (= nil @state2-last-cb))
(.set-data state2 "/root" (barr 2))
(is (= {:type :node-data-changed :path "/root"} (read-and-reset! state2-last-cb)))
(is (= nil @state1-last-cb))
(.set-data state2 "/root" (barr 3))
(is (= nil @state2-last-cb))
(.get-data state2 "/root" true)
(.get-data state2 "/root" false)
(.delete-node state1 "/root")
(is (= {:type :node-deleted :path "/root"} (read-and-reset! state2-last-cb)))
(.get-data state2 "/root" true)
(.set-ephemeral-node state1 "/root" (barr 1 2 3 4))
(is (= {:type :node-created :path "/root"} (read-and-reset! state2-last-cb)))
(.get-children state1 "/" true)
(.set-data state2 "/a" (barr 9))
(is (= nil @state2-last-cb))
(is (= {:type :node-children-changed :path "/"} (read-and-reset! state1-last-cb)))
(.get-data state2 "/root" true)
(.set-ephemeral-node state1 "/root" (barr 1 2))
(is (= {:type :node-data-changed :path "/root"} (read-and-reset! state2-last-cb)))
(.mkdirs state1 "/ccc")
(.get-children state1 "/ccc" true)
(.get-data state2 "/ccc/b" true)
(.set-data state2 "/ccc/b" (barr 8))
(is (= {:type :node-created :path "/ccc/b"} (read-and-reset! state2-last-cb)))
(is (= {:type :node-children-changed :path "/ccc"} (read-and-reset! state1-last-cb)))
(.get-data state2 "/root" true)
(.get-data state2 "/root2" true)
(.close state1)
(is (= {:type :node-deleted :path "/root"} (read-and-reset! state2-last-cb)))
(.set-data state2 "/root2" (barr 9))
(is (= {:type :node-created :path "/root2"} (read-and-reset! state2-last-cb)))
(.close state2)
(deftest test-storm-cluster-state-basics
(with-inprocess-zookeeper zk-port
(let [state (mk-storm-state zk-port)
assignment1 (Assignment. "/aaa" {} {1 [2 2002 1]} {})
assignment2 (Assignment. "/aaa" {} {1 [2 2002]} {})
base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {})
base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {})]
(is (= [] (.assignments state nil)))
(.set-assignment! state "storm1" assignment1)
(is (= assignment1 (.assignment-info state "storm1" nil)))
(is (= nil (.assignment-info state "storm3" nil)))
(.set-assignment! state "storm1" assignment2)
(.set-assignment! state "storm3" assignment1)
(is (= #{"storm1" "storm3"} (set (.assignments state nil))))
(is (= assignment2 (.assignment-info state "storm1" nil)))
(is (= assignment1 (.assignment-info state "storm3" nil)))
(is (= [] (.active-storms state)))
(.activate-storm! state "storm1" base1)
(is (= ["storm1"] (.active-storms state)))
(is (= base1 (.storm-base state "storm1" nil)))
(is (= nil (.storm-base state "storm2" nil)))
(.activate-storm! state "storm2" base2)
(is (= base1 (.storm-base state "storm1" nil)))
(is (= base2 (.storm-base state "storm2" nil)))
(is (= #{"storm1" "storm2"} (set (.active-storms state))))
(.remove-storm-base! state "storm1")
(is (= base2 (.storm-base state "storm2" nil)))
(is (= #{"storm2"} (set (.active-storms state))))
;; TODO add tests for task info and task heartbeat setting and getting
(.disconnect state)
(defn- validate-errors! [state storm-id component errors-list]
(let [errors (.errors state storm-id component)]
;;(println errors)
(is (= (count errors) (count errors-list)))
(doseq [[error target] (map vector errors errors-list)]
(when-not (.contains (:error error) target)
(println target " => " (:error error)))
(is (.contains (:error error) target))
(deftest test-storm-cluster-state-errors
(with-inprocess-zookeeper zk-port
(let [state (mk-storm-state zk-port)]
(.report-error state "a" "1"(local-hostname) 6700 (RuntimeException.))
(validate-errors! state "a" "1" ["RuntimeException"])
(advance-time-secs! 1)
(.report-error state "a" "1" (local-hostname) 6700 (IllegalArgumentException.))
(validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"])
(doseq [i (range 10)]
(.report-error state "a" "2" (local-hostname) 6700 (RuntimeException.))
(advance-time-secs! 2))
(validate-errors! state "a" "2" (repeat 10 "RuntimeException"))
(doseq [i (range 5)]
(.report-error state "a" "2" (local-hostname) 6700 (IllegalArgumentException.))
(advance-time-secs! 2))
(validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException")
(repeat 5 "RuntimeException")
(.disconnect state)
(deftest test-supervisor-state
(with-inprocess-zookeeper zk-port
(let [state1 (mk-storm-state zk-port)
state2 (mk-storm-state zk-port)]
(is (= [] (.supervisors state1 nil)))
(.supervisor-heartbeat! state2 "2" {:a 1})
(.supervisor-heartbeat! state1 "1" {})
(is (= {:a 1} (.supervisor-info state1 "2")))
(is (= {} (.supervisor-info state1 "1")))
(is (= #{"1" "2"} (set (.supervisors state1 nil))))
(is (= #{"1" "2"} (set (.supervisors state2 nil))))
(.disconnect state2)
(is (= #{"1"} (set (.supervisors state1 nil))))
(.disconnect state1)
(deftest test-storm-state-callbacks
;; TODO finish