blob: b8351c78b9fb8ccbd0a11f436280d0f62ab3e344 [file] [log] [blame]
(ns backtype.storm.integration-test
(:use [clojure test])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
)
(bootstrap)
;; (deftest test-counter
;; (with-local-cluster [cluster :supervisors 4]
;; (let [state (:storm-cluster-state cluster)
;; nimbus (:nimbus cluster)
;; topology (thrift/mk-topology
;; {1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
;; {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4)
;; 3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
;; 4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
;; })]
;; (submit-local-topology nimbus
;; "counter"
;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;; topology)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "counter")
;; (Thread/sleep 10000)
;; )))
;; (deftest test-multilang-fy
;; (with-local-cluster [cluster :supervisors 4]
;; (let [nimbus (:nimbus cluster)
;; topology (thrift/mk-topology
;; {1 (thrift/mk-spout-spec (TestWordSpout. false))}
;; {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "fancy" "tester.fy" ["word"] :parallelism-hint 1)}
;; )]
;; (submit-local-topology nimbus
;; "test"
;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;; topology)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "test")
;; (Thread/sleep 10000)
;; )))
;; (deftest test-multilang-rb
;; (with-local-cluster [cluster :supervisors 4]
;; (let [nimbus (:nimbus cluster)
;; topology (thrift/mk-topology
;; {1 (thrift/mk-spout-spec (TestWordSpout. false))}
;; {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "ruby" "tester.rb" ["word"] :parallelism-hint 1)}
;; )]
;; (submit-local-topology nimbus
;; "test"
;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;; topology)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "test")
;; (Thread/sleep 10000)
;; )))
(deftest test-multilang-py
(with-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. false))}
{"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} "python" "tester.py" ["word"] :parallelism-hint 1)}
)]
(submit-local-topology nimbus
"test"
{TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
topology)
(Thread/sleep 10000)
(.killTopology nimbus "test")
(Thread/sleep 10000)
)))
(deftest test-basic-topology
(doseq [zmq-on? [true false]]
(with-simulated-time-local-cluster [cluster :supervisors 4
:daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
"3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
"4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
})
results (complete-topology cluster
topology
:mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
:storm-conf {TOPOLOGY-DEBUG true
TOPOLOGY-WORKERS 2})]
(is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
(read-tuples results "1")))
(is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
(read-tuples results "2")))
(is (= [[1] [2] [3] [4]]
(read-tuples results "3")))
(is (= [[1] [2] [3] [4]]
(read-tuples results "4")))
))))
(deftest test-shuffle
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
{"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
:parallelism-hint 6)
})
results (complete-topology cluster
topology
;; important for test that
;; #tuples = multiple of 4 and 6
:mock-sources {"1" [["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
]}
)]
(is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
(read-tuples results "2")))
)))
(defbolt lalala-bolt1 ["word"] [tuple collector]
(let [ret (-> (.getValue tuple 0) (str "lalala"))]
(.emit collector tuple [ret])
(.ack collector tuple)
))
(defbolt lalala-bolt2 ["word"] {:prepare true}
[conf context collector]
(let [state (atom nil)]
(reset! state "lalala")
(bolt
(execute [tuple]
(let [ret (-> (.getValue tuple 0) (str @state))]
(.emit collector tuple [ret])
(.ack collector tuple)
))
)))
(defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]}
[conf context collector]
(let [state (atom nil)]
(bolt
(prepare [_ _ _]
(reset! state (str prefix "lalala")))
(execute [tuple]
(let [ret (-> (.getValue tuple 0) (str @state))]
(.emit collector tuple [ret])
(.ack collector tuple)
)))
))
(deftest test-clojure-bolt
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. false))}
{"2" (thrift/mk-bolt-spec {"1" :shuffle}
lalala-bolt1)
"3" (thrift/mk-bolt-spec {"1" :shuffle}
lalala-bolt2)
"4" (thrift/mk-bolt-spec {"1" :shuffle}
(lalala-bolt3 "_nathan_"))}
)
results (complete-topology cluster
topology
:mock-sources {"1" [["david"]
["adam"]
]}
)]
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
(is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4")))
)))
(defn ack-tracking-feeder [fields]
(let [tracker (AckTracker.)]
[(doto (feeder-spout fields)
(.setAckFailDelegate tracker))
(fn [val]
(is (= (.getNumAcks tracker) val))
(.resetNumAcks tracker)
)]
))
(defbolt branching-bolt ["num"]
{:params [amt]}
[tuple collector]
(doseq [i (range amt)]
(emit-bolt! collector [i] :anchor tuple))
(ack! collector tuple))
(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
[conf context collector]
(let [seen (atom [])]
(bolt
(execute [tuple]
(swap! seen conj tuple)
(when (= (count @seen) amt)
(emit-bolt! collector [1] :anchor @seen)
(doseq [s @seen]
(ack! collector s))
(reset! seen [])
)))
))
(defbolt ack-bolt {}
[tuple collector]
(ack! collector tuple))
(defbolt identity-bolt ["num"]
[tuple collector]
(emit-bolt! collector (.getValues tuple) :anchor tuple)
(ack! collector tuple))
(deftest test-acking
(with-tracked-cluster [cluster]
(let [[feeder1 checker1] (ack-tracking-feeder ["num"])
[feeder2 checker2] (ack-tracking-feeder ["num"])
[feeder3 checker3] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
{"1" [feeder1]
"2" [feeder2]
"3" [feeder3]}
{"4" [{"1" :shuffle} (branching-bolt 2)]
"5" [{"2" :shuffle} (branching-bolt 4)]
"6" [{"3" :shuffle} (branching-bolt 1)]
"7" [{"4" :shuffle
"5" :shuffle
"6" :shuffle} (agg-bolt 3)]
"8" [{"7" :shuffle} (branching-bolt 2)]
"9" [{"8" :shuffle} ack-bolt]}
)]
(submit-local-topology (:nimbus cluster)
"acking-test1"
{TOPOLOGY-DEBUG true}
(:topology tracked))
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 0)
(.feed feeder2 [1])
(tracked-wait tracked 1)
(checker1 1)
(checker2 1)
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 0)
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 1)
(.feed feeder3 [1])
(tracked-wait tracked 1)
(checker1 0)
(checker3 0)
(.feed feeder2 [1])
(tracked-wait tracked 1)
(checker1 1)
(checker2 1)
(checker3 1)
)))
(deftest test-ack-branching
(with-tracked-cluster [cluster]
(let [[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
{"1" [feeder]}
{"2" [{"1" :shuffle} identity-bolt]
"3" [{"1" :shuffle} identity-bolt]
"4" [{"2" :shuffle
"3" :shuffle} (agg-bolt 4)]})]
(submit-local-topology (:nimbus cluster)
"test-acking2"
{}
(:topology tracked))
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 0)
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 2)
)))
(defbolt dup-anchor ["num"]
[tuple collector]
(emit-bolt! collector [1] :anchor [tuple tuple])
(ack! collector tuple))
(deftest test-acking-self-anchor
(with-tracked-cluster [cluster]
(let [[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
{"1" [feeder]}
{"2" [{"1" :shuffle} dup-anchor]
"3" [{"2" :shuffle} ack-bolt]})]
(submit-local-topology (:nimbus cluster)
"test"
{}
(:topology tracked))
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 1)
(.feed feeder [1])
(.feed feeder [1])
(.feed feeder [1])
(tracked-wait tracked 3)
(checker 3)
)))
;; (defspout ConstantSpout ["val"] {:prepare false}
;; [collector]
;; (Time/sleep 100)
;; (emit-spout! collector [1]))
;; (def errored (atom false))
;; (def restarted (atom false))
;; (defbolt local-error-checker {} [tuple collector]
;; (when-not @errored
;; (reset! errored true)
;; (println "erroring")
;; (throw (RuntimeException.)))
;; (when-not @restarted (println "restarted"))
;; (reset! restarted true))
;; (deftest test-no-halt-local-mode
;; (with-simulated-time-local-cluster [cluster]
;; (let [topology (topology
;; {1 (spout-spec ConstantSpout)}
;; {2 (bolt-spec {1 :shuffle} local-error-checker)
;; })]
;; (submit-local-topology (:nimbus cluster)
;; "test"
;; {}
;; topology)
;; (while (not @restarted)
;; (advance-time-ms! 100))
;; )))
(defspout IncSpout ["word"]
[conf context collector]
(let [state (atom 0)]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [@state] :id 1)
)
(ack [id]
(swap! state inc))
)))
(defspout IncSpout2 ["word"] {:params [prefix]}
[conf context collector]
(let [state (atom 0)]
(spout
(nextTuple []
(Thread/sleep 100)
(swap! state inc)
(emit-spout! collector [(str prefix "-" @state)])
)
)))
;; (deftest test-clojure-spout
;; (with-local-cluster [cluster]
;; (let [nimbus (:nimbus cluster)
;; top (topology
;; {1 (spout-spec IncSpout)}
;; {}
;; )]
;; (submit-local-topology nimbus
;; "spout-test"
;; {TOPOLOGY-DEBUG true
;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
;; top)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "spout-test")
;; (Thread/sleep 10000)
;; )))
(deftest test-acking-branching-complex
;; test acking with branching in the topology
)
(deftest test-fields-grouping
;; 1. put a shitload of random tuples through it and test that counts are right
;; 2. test that different spouts with different phints group the same way
)
(deftest test-all-grouping
)
(deftest test-direct-grouping
)