blob: 4e6e3d81cd2de11813172e390f0289110454fca1 [file] [log] [blame]
(ns backtype.storm.integration-test
(:use [clojure test])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
)
(bootstrap)
;; (deftest test-counter
;; (with-local-cluster [cluster :supervisors 4]
;; (let [state (:storm-cluster-state cluster)
;; nimbus (:nimbus cluster)
;; topology (thrift/mk-topology
;; {1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
;; {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4)
;; 3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
;; 4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
;; })]
;; (submit-local-topology nimbus
;; "counter"
;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;; topology)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "counter")
;; (Thread/sleep 10000)
;; )))
;; (deftest test-multilang-fy
;; (with-local-cluster [cluster :supervisors 4]
;; (let [nimbus (:nimbus cluster)
;; topology (thrift/mk-topology
;; {1 (thrift/mk-spout-spec (TestWordSpout. false))}
;; {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "fancy" "tester.fy" ["word"] :parallelism-hint 1)}
;; )]
;; (submit-local-topology nimbus
;; "test"
;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;; topology)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "test")
;; (Thread/sleep 10000)
;; )))
;; (deftest test-multilang-rb
;; (with-local-cluster [cluster :supervisors 4]
;; (let [nimbus (:nimbus cluster)
;; topology (thrift/mk-topology
;; {1 (thrift/mk-spout-spec (TestWordSpout. false))}
;; {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "ruby" "tester.rb" ["word"] :parallelism-hint 1)}
;; )]
;; (submit-local-topology nimbus
;; "test"
;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;; topology)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "test")
;; (Thread/sleep 10000)
;; )))
(deftest test-multilang-py
(with-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestWordSpout. false))}
{2 (thrift/mk-shell-bolt-spec {1 :shuffle} "python" "tester.py" ["word"] :parallelism-hint 1)}
)]
(submit-local-topology nimbus
"test"
{TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
topology)
(Thread/sleep 10000)
(.killTopology nimbus "test")
(Thread/sleep 10000)
)))
(deftest test-basic-topology
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
{2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4)
3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
})
results (complete-topology cluster
topology
:mock-sources {1 [["nathan"] ["bob"] ["joey"] ["nathan"]]}
:storm-conf {TOPOLOGY-DEBUG true})]
(is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
(read-tuples results 1)))
(is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
(read-tuples results 2)))
(is (= [[1] [2] [3] [4]]
(read-tuples results 3)))
(is (= [[1] [2] [3] [4]]
(read-tuples results 4)))
)))
(deftest test-shuffle
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
{2 (thrift/mk-bolt-spec {1 :shuffle} (TestGlobalCount.)
:parallelism-hint 6)
})
results (complete-topology cluster
topology
;; important for test that
;; #tuples = multiple of 4 and 6
:mock-sources {1 [["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
]}
)]
(is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
(read-tuples results 2)))
)))
(defbolt lalala-bolt1 ["word"] [tuple collector]
(let [ret (-> (.getValue tuple 0) (str "lalala"))]
(.emit collector tuple [ret])
(.ack collector tuple)
))
(defboltfull lalala-bolt2 ["word"]
:let [state (atom nil)]
:prepare ([conf context collector]
(println "prepare")
(reset! state "lalala")
)
:execute ([tuple collector]
(let [ret (-> (.getValue tuple 0) (str @state))]
(.emit collector tuple [ret])
(.ack collector tuple)
)))
(defboltfull lalala-bolt3 ["word"]
:let [state (atom nil)]
:params [prefix]
:prepare ([conf context collector]
(reset! state (str prefix "lalala"))
)
:execute ([tuple collector]
(let [ret (-> (.getValue tuple 0) (str @state))]
(.emit collector tuple [ret])
(.ack collector tuple)
)))
(deftest test-clojure-bolt
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{1 (thrift/mk-spout-spec (TestWordSpout. false))}
{2 (thrift/mk-bolt-spec {1 :shuffle}
lalala-bolt1)
3 (thrift/mk-bolt-spec {1 :shuffle}
lalala-bolt2)
4 (thrift/mk-bolt-spec {1 :shuffle}
(lalala-bolt3 "_nathan_"))}
)
results (complete-topology cluster
topology
:mock-sources {1 [["david"]
["adam"]
]}
)]
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results 2)))
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results 3)))
(is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results 4)))
)))
(defn ack-tracking-feeder [fields]
(let [tracker (AckTracker.)]
[(doto (feeder-spout fields)
(.setAckFailDelegate tracker))
(fn [val]
(is (= (.getNumAcks tracker) val))
(.resetNumAcks tracker)
)]
))
(defboltfull branching-bolt ["num"]
:params [amt]
:execute ([tuple collector]
(doseq [i (range amt)]
(.emit collector tuple [i]))
(.ack collector tuple)
))
(defboltfull agg-bolt ["num"]
:let [seen (atom [])]
:params [amt]
:execute ([tuple collector]
(swap! seen conj tuple)
(when (= (count @seen) amt)
(.emit collector @seen [1])
(doseq [s @seen]
(.ack collector s))
(reset! seen [])
)))
(defbolt ack-bolt {}
[tuple collector]
(.ack collector tuple))
(defbolt identity-bolt ["num"]
[tuple collector]
(.emit collector tuple (.getValues tuple))
(.ack collector tuple))
(deftest test-acking
(with-tracked-cluster [cluster]
(let [[feeder1 checker1] (ack-tracking-feeder ["num"])
[feeder2 checker2] (ack-tracking-feeder ["num"])
[feeder3 checker3] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
{1 [feeder1]
2 [feeder2]
3 [feeder3]}
{4 [{1 :shuffle} (branching-bolt 2)]
5 [{2 :shuffle} (branching-bolt 4)]
6 [{3 :shuffle} (branching-bolt 1)]
7 [{4 :shuffle
5 :shuffle
6 :shuffle} (agg-bolt 3)]
8 [{7 :shuffle} (branching-bolt 2)]
9 [{8 :shuffle} ack-bolt]}
)]
(submit-local-topology (:nimbus cluster)
"test"
{}
(:topology tracked))
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 0)
(.feed feeder2 [1])
(tracked-wait tracked 1)
(checker1 1)
(checker2 1)
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 0)
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 1)
(.feed feeder3 [1])
(tracked-wait tracked 1)
(checker1 0)
(checker3 0)
(.feed feeder2 [1])
(tracked-wait tracked 1)
(checker1 1)
(checker2 1)
(checker3 1)
)))
(deftest test-ack-branching
(with-tracked-cluster [cluster]
(let [[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
{1 [feeder]}
{2 [{1 :shuffle} identity-bolt]
3 [{1 :shuffle} identity-bolt]
4 [{2 :shuffle
3 :shuffle} (agg-bolt 4)]})]
(submit-local-topology (:nimbus cluster)
"test"
{}
(:topology tracked))
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 0)
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 2)
)))
(defbolt dup-anchor ["num"]
[tuple collector]
(.emit collector [tuple tuple] [1])
(.ack collector tuple))
(deftest test-acking-self-anchor
(with-tracked-cluster [cluster]
(let [[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
{1 [feeder]}
{2 [{1 :shuffle} dup-anchor]
3 [{2 :shuffle} ack-bolt]})]
(submit-local-topology (:nimbus cluster)
"test"
{}
(:topology tracked))
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 1)
(.feed feeder [1])
(.feed feeder [1])
(.feed feeder [1])
(tracked-wait tracked 3)
(checker 3)
)))
(deftest test-acking-branching-complex
;; test acking with branching in the topology
)
(deftest test-fields-grouping
;; 1. put a shitload of random tuples through it and test that counts are right
;; 2. test that different spouts with different phints group the same way
)
(deftest test-all-grouping
)
(deftest test-direct-grouping
)