blob: 6bdcb0e85a1e8541aeda856d00774b8198d24eb4 [file] [log] [blame]
(ns backtype.storm.integration-test
(:use [clojure test])
(:import [backtype.storm.topology TopologyBuilder])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
)
(bootstrap)
;; (deftest test-counter
;; (with-local-cluster [cluster :supervisors 4]
;; (let [state (:storm-cluster-state cluster)
;; nimbus (:nimbus cluster)
;; topology (thrift/mk-topology
;; {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
;; {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
;; "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
;; "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
;; })]
;; (submit-local-topology nimbus
;; "counter"
;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;; topology)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "counter")
;; (Thread/sleep 10000)
;; )))
;; (deftest test-multilang-fy
;; (with-local-cluster [cluster :supervisors 4]
;; (let [nimbus (:nimbus cluster)
;; topology (thrift/mk-topology
;; {"1" (thrift/mk-spout-spec (TestWordSpout. false))}
;; {"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} "fancy" "tester.fy" ["word"] :parallelism-hint 1)}
;; )]
;; (submit-local-topology nimbus
;; "test"
;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;; topology)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "test")
;; (Thread/sleep 10000)
;; )))
(deftest test-multilang-rb
(with-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-shell-spout-spec ["ruby" "tester_spout.rb"] ["word"])}
{"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} "ruby" "tester_bolt.rb" ["word"] :parallelism-hint 1)})]
(submit-local-topology nimbus
"test"
{TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
topology)
(Thread/sleep 10000)
(.killTopology nimbus "test")
(Thread/sleep 10000))))
(deftest test-multilang-py
(with-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-shell-spout-spec ["python" "tester_spout.py"] ["word"])}
{"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} ["python" "tester_bolt.py"] ["word"] :parallelism-hint 1)}
)]
(submit-local-topology nimbus
"test"
{TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
topology)
(Thread/sleep 10000)
(.killTopology nimbus "test")
(Thread/sleep 10000)
)))
(deftest test-basic-topology
(doseq [zmq-on? [true false]]
(with-simulated-time-local-cluster [cluster :supervisors 4
:daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
"3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
"4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
})
results (complete-topology cluster
topology
:mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
:storm-conf {TOPOLOGY-WORKERS 2})]
(is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
(read-tuples results "1")))
(is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
(read-tuples results "2")))
(is (= [[1] [2] [3] [4]]
(read-tuples results "3")))
(is (= [[1] [2] [3] [4]]
(read-tuples results "4")))
))))
(defbolt identity-bolt ["num"]
[tuple collector]
(emit-bolt! collector (.getValues tuple) :anchor tuple)
(ack! collector tuple))
(deftest test-system-stream
;; this test works because mocking a spout splits up the tuples evenly among the tasks
(with-simulated-time-local-cluster [cluster]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
{"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
})
results (complete-topology cluster
topology
:mock-sources {"1" [["a"] ["b"] ["c"]]}
:storm-conf {TOPOLOGY-WORKERS 2})]
(is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
(read-tuples results "2")))
)))
(deftest test-shuffle
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
{"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
:parallelism-hint 6)
})
results (complete-topology cluster
topology
;; important for test that
;; #tuples = multiple of 4 and 6
:mock-sources {"1" [["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
]}
)]
(is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
(read-tuples results "2")))
)))
(defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
(let [ret (str val "lalala")]
(emit-bolt! collector [ret] :anchor tuple)
(ack! collector tuple)
))
(defbolt lalala-bolt2 ["word"] {:prepare true}
[conf context collector]
(let [state (atom nil)]
(reset! state "lalala")
(bolt
(execute [tuple]
(let [ret (-> (.getValue tuple 0) (str @state))]
(emit-bolt! collector [ret] :anchor tuple)
(ack! collector tuple)
))
)))
(defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]}
[conf context collector]
(let [state (atom nil)]
(bolt
(prepare [_ _ _]
(reset! state (str prefix "lalala")))
(execute [{val "word" :as tuple}]
(let [ret (-> (.getValue tuple 0) (str @state))]
(emit-bolt! collector [ret] :anchor tuple)
(ack! collector tuple)
)))
))
(deftest test-clojure-bolt
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. false))}
{"2" (thrift/mk-bolt-spec {"1" :shuffle}
lalala-bolt1)
"3" (thrift/mk-bolt-spec {"1" :local-or-shuffle}
lalala-bolt2)
"4" (thrift/mk-bolt-spec {"1" :shuffle}
(lalala-bolt3 "_nathan_"))}
)
results (complete-topology cluster
topology
:mock-sources {"1" [["david"]
["adam"]
]}
)]
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
(is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4")))
)))
(defbolt punctuator-bolt ["word" "period" "question" "exclamation"]
[tuple collector]
(if (= (:word tuple) "bar")
(do
(emit-bolt! collector {:word "bar" :period "bar" :question "bar"
"exclamation" "bar"})
(ack! collector tuple))
(let [ res (assoc tuple :period (str (:word tuple) "."))
res (assoc res :exclamation (str (:word tuple) "!"))
res (assoc res :question (str (:word tuple) "?")) ]
(emit-bolt! collector res)
(ack! collector tuple))))
(deftest test-map-emit
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [topology (thrift/mk-topology
{"words" (thrift/mk-spout-spec (TestWordSpout. false))}
{"out" (thrift/mk-bolt-spec {"words" :shuffle}
punctuator-bolt)}
)
results (complete-topology cluster
topology
:mock-sources {"words" [["foo"] ["bar"]]}
)]
(is (ms= [["foo" "foo." "foo?" "foo!"]
["bar" "bar" "bar" "bar"] ] (read-tuples results "out"))))))
(defn ack-tracking-feeder [fields]
(let [tracker (AckTracker.)]
[(doto (feeder-spout fields)
(.setAckFailDelegate tracker))
(fn [val]
(is (= (.getNumAcks tracker) val))
(.resetNumAcks tracker)
)]
))
(defbolt branching-bolt ["num"]
{:params [amt]}
[tuple collector]
(doseq [i (range amt)]
(emit-bolt! collector [i] :anchor tuple))
(ack! collector tuple))
(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
[conf context collector]
(let [seen (atom [])]
(bolt
(execute [tuple]
(swap! seen conj tuple)
(when (= (count @seen) amt)
(emit-bolt! collector [1] :anchor @seen)
(doseq [s @seen]
(ack! collector s))
(reset! seen [])
)))
))
(defbolt ack-bolt {}
[tuple collector]
(ack! collector tuple))
(deftest test-acking
(with-tracked-cluster [cluster]
(let [[feeder1 checker1] (ack-tracking-feeder ["num"])
[feeder2 checker2] (ack-tracking-feeder ["num"])
[feeder3 checker3] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
(topology
{"1" (spout-spec feeder1)
"2" (spout-spec feeder2)
"3" (spout-spec feeder3)}
{"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
"5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
"6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
"7" (bolt-spec
{"4" :shuffle
"5" :shuffle
"6" :shuffle}
(agg-bolt 3))
"8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
"9" (bolt-spec {"8" :shuffle} ack-bolt)}
))]
(submit-local-topology (:nimbus cluster)
"acking-test1"
{}
(:topology tracked))
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 0)
(.feed feeder2 [1])
(tracked-wait tracked 1)
(checker1 1)
(checker2 1)
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 0)
(.feed feeder1 [1])
(tracked-wait tracked 1)
(checker1 1)
(.feed feeder3 [1])
(tracked-wait tracked 1)
(checker1 0)
(checker3 0)
(.feed feeder2 [1])
(tracked-wait tracked 1)
(checker1 1)
(checker2 1)
(checker3 1)
)))
(deftest test-ack-branching
(with-tracked-cluster [cluster]
(let [[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
(topology
{"1" (spout-spec feeder)}
{"2" (bolt-spec {"1" :shuffle} identity-bolt)
"3" (bolt-spec {"1" :shuffle} identity-bolt)
"4" (bolt-spec
{"2" :shuffle
"3" :shuffle}
(agg-bolt 4))}))]
(submit-local-topology (:nimbus cluster)
"test-acking2"
{}
(:topology tracked))
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 0)
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 2)
)))
(defbolt dup-anchor ["num"]
[tuple collector]
(emit-bolt! collector [1] :anchor [tuple tuple])
(ack! collector tuple))
(deftest test-acking-self-anchor
(with-tracked-cluster [cluster]
(let [[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
(topology
{"1" (spout-spec feeder)}
{"2" (bolt-spec {"1" :shuffle} dup-anchor)
"3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
(submit-local-topology (:nimbus cluster)
"test"
{}
(:topology tracked))
(.feed feeder [1])
(tracked-wait tracked 1)
(checker 1)
(.feed feeder [1])
(.feed feeder [1])
(.feed feeder [1])
(tracked-wait tracked 3)
(checker 3)
)))
;; (defspout ConstantSpout ["val"] {:prepare false}
;; [collector]
;; (Time/sleep 100)
;; (emit-spout! collector [1]))
;; (def errored (atom false))
;; (def restarted (atom false))
;; (defbolt local-error-checker {} [tuple collector]
;; (when-not @errored
;; (reset! errored true)
;; (println "erroring")
;; (throw (RuntimeException.)))
;; (when-not @restarted (println "restarted"))
;; (reset! restarted true))
;; (deftest test-no-halt-local-mode
;; (with-simulated-time-local-cluster [cluster]
;; (let [topology (topology
;; {1 (spout-spec ConstantSpout)}
;; {2 (bolt-spec {1 :shuffle} local-error-checker)
;; })]
;; (submit-local-topology (:nimbus cluster)
;; "test"
;; {}
;; topology)
;; (while (not @restarted)
;; (advance-time-ms! 100))
;; )))
(defspout IncSpout ["word"]
[conf context collector]
(let [state (atom 0)]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [@state] :id 1)
)
(ack [id]
(swap! state inc))
)))
(defspout IncSpout2 ["word"] {:params [prefix]}
[conf context collector]
(let [state (atom 0)]
(spout
(nextTuple []
(Thread/sleep 100)
(swap! state inc)
(emit-spout! collector [(str prefix "-" @state)])
)
)))
;; (deftest test-clojure-spout
;; (with-local-cluster [cluster]
;; (let [nimbus (:nimbus cluster)
;; top (topology
;; {1 (spout-spec IncSpout)}
;; {}
;; )]
;; (submit-local-topology nimbus
;; "spout-test"
;; {TOPOLOGY-DEBUG true
;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
;; top)
;; (Thread/sleep 10000)
;; (.killTopology nimbus "spout-test")
;; (Thread/sleep 10000)
;; )))
(deftest test-component-specific-config
(with-simulated-time-local-cluster [cluster
:daemon-conf {TOPOLOGY-OPTIMIZE false
TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
(letlocals
(bind builder (TopologyBuilder.))
(.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
(-> builder
(.setBolt "2"
(TestConfBolt.
{"fake.config" 123
TOPOLOGY-MAX-TASK-PARALLELISM 20
TOPOLOGY-MAX-SPOUT-PENDING 30
TOPOLOGY-OPTIMIZE true
TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
{"fake.type2" "a.serializer"}]
}))
(.shuffleGrouping "1")
(.setMaxTaskParallelism 2)
(.addConfiguration "fake.config2" 987)
)
(bind results
(complete-topology cluster
(.createTopology builder)
:storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
:mock-sources {"1" [["fake.config"]
[TOPOLOGY-MAX-TASK-PARALLELISM]
[TOPOLOGY-MAX-SPOUT-PENDING]
[TOPOLOGY-OPTIMIZE]
["fake.config2"]
[TOPOLOGY-KRYO-REGISTER]
]}))
(is (= {"fake.config" 123
"fake.config2" 987
TOPOLOGY-MAX-TASK-PARALLELISM 2
TOPOLOGY-MAX-SPOUT-PENDING 30
TOPOLOGY-OPTIMIZE false
TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
"fake.type2" "a.serializer"
"fake.type3" "a.serializer3"}}
(->> (read-tuples results "2")
(apply concat)
(apply hash-map))
))
)))
(defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf conf}
[conf context collector]
(bolt
(execute [tuple]
(let [name (.getValue tuple 0)
val (if (= name "!MAX_MSG_TIMEOUT") (.maxTopologyMessageTimeout context) (get conf name))]
(emit-bolt! collector [name val] :anchor tuple)
(ack! collector tuple))
)))
(deftest test-component-specific-config-clojure
(with-simulated-time-local-cluster [cluster]
(let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])) :conf {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})
}
{"2" (bolt-spec {"1" :shuffle}
(conf-query-bolt {"fake.config" 1
TOPOLOGY-MAX-TASK-PARALLELISM 2
TOPOLOGY-MAX-SPOUT-PENDING 10})
:conf {TOPOLOGY-MAX-SPOUT-PENDING 3})
})
results (complete-topology cluster
topology
:topology-name "test123"
:storm-conf {TOPOLOGY-MAX-TASK-PARALLELISM 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
:mock-sources {"1" [["fake.config"]
[TOPOLOGY-MAX-TASK-PARALLELISM]
[TOPOLOGY-MAX-SPOUT-PENDING]
["!MAX_MSG_TIMEOUT"]
[TOPOLOGY-NAME]
]})]
(is (= {"fake.config" 1
TOPOLOGY-MAX-TASK-PARALLELISM 2
TOPOLOGY-MAX-SPOUT-PENDING 3
"!MAX_MSG_TIMEOUT" 40
TOPOLOGY-NAME "test123"}
(->> (read-tuples results "2")
(apply concat)
(apply hash-map))
)))))
(defbolt hooks-bolt ["emit" "ack" "fail"] {:prepare true}
[conf context collector]
(let [acked (atom 0)
failed (atom 0)
emitted (atom 0)]
(.addTaskHook context
(reify backtype.storm.hooks.ITaskHook
(prepare [this conf context]
)
(cleanup [this]
)
(emit [this info]
(swap! emitted inc))
(boltAck [this info]
(swap! acked inc))
(boltFail [this info]
(swap! failed inc))))
(bolt
(execute [tuple]
(emit-bolt! collector [@emitted @acked @failed])
(if (= 0 (- @acked @failed))
(ack! collector tuple)
(fail! collector tuple))
))))
(deftest test-hooks
(with-simulated-time-local-cluster [cluster]
(let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
}
{"2" (bolt-spec {"1" :shuffle}
hooks-bolt)
})
results (complete-topology cluster
topology
:mock-sources {"1" [[1]
[1]
[1]
[1]
]})]
(is (= [[0 0 0]
[2 1 0]
[4 1 1]
[6 2 1]]
(read-tuples results "2")
)))))
(deftest test-acking-branching-complex
;; test acking with branching in the topology
)
(deftest test-fields-grouping
;; 1. put a shitload of random tuples through it and test that counts are right
;; 2. test that different spouts with different phints group the same way
)
(deftest test-all-grouping
)
(deftest test-direct-grouping
)