| (ns backtype.storm.integration-test |
| (:use [clojure test]) |
| (:import [backtype.storm.topology TopologyBuilder]) |
| (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt]) |
| (:use [backtype.storm bootstrap testing]) |
| (:use [backtype.storm.daemon common]) |
| ) |
| |
| (bootstrap) |
| |
| ;; (deftest test-counter |
| ;; (with-local-cluster [cluster :supervisors 4] |
| ;; (let [state (:storm-cluster-state cluster) |
| ;; nimbus (:nimbus cluster) |
| ;; topology (thrift/mk-topology |
| ;; {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} |
| ;; {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4) |
| ;; "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.)) |
| ;; "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.)) |
| ;; })] |
| ;; (submit-local-topology nimbus |
| ;; "counter" |
| ;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} |
| ;; topology) |
| ;; (Thread/sleep 10000) |
| ;; (.killTopology nimbus "counter") |
| ;; (Thread/sleep 10000) |
| ;; ))) |
| |
| ;; (deftest test-multilang-fy |
| ;; (with-local-cluster [cluster :supervisors 4] |
| ;; (let [nimbus (:nimbus cluster) |
| ;; topology (thrift/mk-topology |
| ;; {"1" (thrift/mk-spout-spec (TestWordSpout. false))} |
| ;; {"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} "fancy" "tester.fy" ["word"] :parallelism-hint 1)} |
| ;; )] |
| ;; (submit-local-topology nimbus |
| ;; "test" |
| ;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} |
| ;; topology) |
| ;; (Thread/sleep 10000) |
| ;; (.killTopology nimbus "test") |
| ;; (Thread/sleep 10000) |
| ;; ))) |
| |
| (deftest test-multilang-rb |
| (with-local-cluster [cluster :supervisors 4] |
| (let [nimbus (:nimbus cluster) |
| topology (thrift/mk-topology |
| {"1" (thrift/mk-shell-spout-spec ["ruby" "tester_spout.rb"] ["word"])} |
| {"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} "ruby" "tester_bolt.rb" ["word"] :parallelism-hint 1)})] |
| (submit-local-topology nimbus |
| "test" |
| {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} |
| topology) |
| (Thread/sleep 10000) |
| (.killTopology nimbus "test") |
| (Thread/sleep 10000)))) |
| |
| |
| (deftest test-multilang-py |
| (with-local-cluster [cluster :supervisors 4] |
| (let [nimbus (:nimbus cluster) |
| topology (thrift/mk-topology |
| {"1" (thrift/mk-shell-spout-spec ["python" "tester_spout.py"] ["word"])} |
| {"2" (thrift/mk-shell-bolt-spec {"1" :shuffle} ["python" "tester_bolt.py"] ["word"] :parallelism-hint 1)} |
| )] |
| (submit-local-topology nimbus |
| "test" |
| {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} |
| topology) |
| (Thread/sleep 10000) |
| (.killTopology nimbus "test") |
| (Thread/sleep 10000) |
| ))) |
| |
| |
| (deftest test-basic-topology |
| (doseq [zmq-on? [true false]] |
| (with-simulated-time-local-cluster [cluster :supervisors 4 |
| :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}] |
| (let [topology (thrift/mk-topology |
| {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} |
| {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4) |
| "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.)) |
| "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.)) |
| }) |
| results (complete-topology cluster |
| topology |
| :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]} |
| :storm-conf {TOPOLOGY-WORKERS 2})] |
| (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]] |
| (read-tuples results "1"))) |
| (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]] |
| (read-tuples results "2"))) |
| (is (= [[1] [2] [3] [4]] |
| (read-tuples results "3"))) |
| (is (= [[1] [2] [3] [4]] |
| (read-tuples results "4"))) |
| )))) |
| |
| (defbolt identity-bolt ["num"] |
| [tuple collector] |
| (emit-bolt! collector (.getValues tuple) :anchor tuple) |
| (ack! collector tuple)) |
| |
| (deftest test-system-stream |
| ;; this test works because mocking a spout splits up the tuples evenly among the tasks |
| (with-simulated-time-local-cluster [cluster] |
| (let [topology (thrift/mk-topology |
| {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)} |
| {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1) |
| }) |
| results (complete-topology cluster |
| topology |
| :mock-sources {"1" [["a"] ["b"] ["c"]]} |
| :storm-conf {TOPOLOGY-WORKERS 2})] |
| (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]] |
| (read-tuples results "2"))) |
| ))) |
| |
| (deftest test-shuffle |
| (with-simulated-time-local-cluster [cluster :supervisors 4] |
| (let [topology (thrift/mk-topology |
| {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)} |
| {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.) |
| :parallelism-hint 6) |
| }) |
| results (complete-topology cluster |
| topology |
| ;; important for test that |
| ;; #tuples = multiple of 4 and 6 |
| :mock-sources {"1" [["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ]} |
| )] |
| (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]])) |
| (read-tuples results "2"))) |
| ))) |
| |
| (defbolt lalala-bolt1 ["word"] [[val :as tuple] collector] |
| (let [ret (str val "lalala")] |
| (emit-bolt! collector [ret] :anchor tuple) |
| (ack! collector tuple) |
| )) |
| |
| (defbolt lalala-bolt2 ["word"] {:prepare true} |
| [conf context collector] |
| (let [state (atom nil)] |
| (reset! state "lalala") |
| (bolt |
| (execute [tuple] |
| (let [ret (-> (.getValue tuple 0) (str @state))] |
| (emit-bolt! collector [ret] :anchor tuple) |
| (ack! collector tuple) |
| )) |
| ))) |
| |
| (defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]} |
| [conf context collector] |
| (let [state (atom nil)] |
| (bolt |
| (prepare [_ _ _] |
| (reset! state (str prefix "lalala"))) |
| (execute [{val "word" :as tuple}] |
| (let [ret (-> (.getValue tuple 0) (str @state))] |
| (emit-bolt! collector [ret] :anchor tuple) |
| (ack! collector tuple) |
| ))) |
| )) |
| |
| (deftest test-clojure-bolt |
| (with-simulated-time-local-cluster [cluster :supervisors 4] |
| (let [nimbus (:nimbus cluster) |
| topology (thrift/mk-topology |
| {"1" (thrift/mk-spout-spec (TestWordSpout. false))} |
| {"2" (thrift/mk-bolt-spec {"1" :shuffle} |
| lalala-bolt1) |
| "3" (thrift/mk-bolt-spec {"1" :local-or-shuffle} |
| lalala-bolt2) |
| "4" (thrift/mk-bolt-spec {"1" :shuffle} |
| (lalala-bolt3 "_nathan_"))} |
| ) |
| results (complete-topology cluster |
| topology |
| :mock-sources {"1" [["david"] |
| ["adam"] |
| ]} |
| )] |
| (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2"))) |
| (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3"))) |
| (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4"))) |
| ))) |
| |
| (defbolt punctuator-bolt ["word" "period" "question" "exclamation"] |
| [tuple collector] |
| (if (= (:word tuple) "bar") |
| (do |
| (emit-bolt! collector {:word "bar" :period "bar" :question "bar" |
| "exclamation" "bar"}) |
| (ack! collector tuple)) |
| (let [ res (assoc tuple :period (str (:word tuple) ".")) |
| res (assoc res :exclamation (str (:word tuple) "!")) |
| res (assoc res :question (str (:word tuple) "?")) ] |
| (emit-bolt! collector res) |
| (ack! collector tuple)))) |
| |
| (deftest test-map-emit |
| (with-simulated-time-local-cluster [cluster :supervisors 4] |
| (let [topology (thrift/mk-topology |
| {"words" (thrift/mk-spout-spec (TestWordSpout. false))} |
| {"out" (thrift/mk-bolt-spec {"words" :shuffle} |
| punctuator-bolt)} |
| ) |
| results (complete-topology cluster |
| topology |
| :mock-sources {"words" [["foo"] ["bar"]]} |
| )] |
| (is (ms= [["foo" "foo." "foo?" "foo!"] |
| ["bar" "bar" "bar" "bar"] ] (read-tuples results "out")))))) |
| |
| |
| (defn ack-tracking-feeder [fields] |
| (let [tracker (AckTracker.)] |
| [(doto (feeder-spout fields) |
| (.setAckFailDelegate tracker)) |
| (fn [val] |
| (is (= (.getNumAcks tracker) val)) |
| (.resetNumAcks tracker) |
| )] |
| )) |
| |
| (defbolt branching-bolt ["num"] |
| {:params [amt]} |
| [tuple collector] |
| (doseq [i (range amt)] |
| (emit-bolt! collector [i] :anchor tuple)) |
| (ack! collector tuple)) |
| |
| (defbolt agg-bolt ["num"] {:prepare true :params [amt]} |
| [conf context collector] |
| (let [seen (atom [])] |
| (bolt |
| (execute [tuple] |
| (swap! seen conj tuple) |
| (when (= (count @seen) amt) |
| (emit-bolt! collector [1] :anchor @seen) |
| (doseq [s @seen] |
| (ack! collector s)) |
| (reset! seen []) |
| ))) |
| )) |
| |
| (defbolt ack-bolt {} |
| [tuple collector] |
| (ack! collector tuple)) |
| |
| (deftest test-acking |
| (with-tracked-cluster [cluster] |
| (let [[feeder1 checker1] (ack-tracking-feeder ["num"]) |
| [feeder2 checker2] (ack-tracking-feeder ["num"]) |
| [feeder3 checker3] (ack-tracking-feeder ["num"]) |
| tracked (mk-tracked-topology |
| cluster |
| (topology |
| {"1" (spout-spec feeder1) |
| "2" (spout-spec feeder2) |
| "3" (spout-spec feeder3)} |
| {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2)) |
| "5" (bolt-spec {"2" :shuffle} (branching-bolt 4)) |
| "6" (bolt-spec {"3" :shuffle} (branching-bolt 1)) |
| "7" (bolt-spec |
| {"4" :shuffle |
| "5" :shuffle |
| "6" :shuffle} |
| (agg-bolt 3)) |
| "8" (bolt-spec {"7" :shuffle} (branching-bolt 2)) |
| "9" (bolt-spec {"8" :shuffle} ack-bolt)} |
| ))] |
| (submit-local-topology (:nimbus cluster) |
| "acking-test1" |
| {} |
| (:topology tracked)) |
| (.feed feeder1 [1]) |
| (tracked-wait tracked 1) |
| (checker1 0) |
| (.feed feeder2 [1]) |
| (tracked-wait tracked 1) |
| (checker1 1) |
| (checker2 1) |
| (.feed feeder1 [1]) |
| (tracked-wait tracked 1) |
| (checker1 0) |
| (.feed feeder1 [1]) |
| (tracked-wait tracked 1) |
| (checker1 1) |
| (.feed feeder3 [1]) |
| (tracked-wait tracked 1) |
| (checker1 0) |
| (checker3 0) |
| (.feed feeder2 [1]) |
| (tracked-wait tracked 1) |
| (checker1 1) |
| (checker2 1) |
| (checker3 1) |
| |
| ))) |
| |
| (deftest test-ack-branching |
| (with-tracked-cluster [cluster] |
| (let [[feeder checker] (ack-tracking-feeder ["num"]) |
| tracked (mk-tracked-topology |
| cluster |
| (topology |
| {"1" (spout-spec feeder)} |
| {"2" (bolt-spec {"1" :shuffle} identity-bolt) |
| "3" (bolt-spec {"1" :shuffle} identity-bolt) |
| "4" (bolt-spec |
| {"2" :shuffle |
| "3" :shuffle} |
| (agg-bolt 4))}))] |
| (submit-local-topology (:nimbus cluster) |
| "test-acking2" |
| {} |
| (:topology tracked)) |
| (.feed feeder [1]) |
| (tracked-wait tracked 1) |
| (checker 0) |
| (.feed feeder [1]) |
| (tracked-wait tracked 1) |
| (checker 2) |
| ))) |
| |
| (defbolt dup-anchor ["num"] |
| [tuple collector] |
| (emit-bolt! collector [1] :anchor [tuple tuple]) |
| (ack! collector tuple)) |
| |
| (deftest test-acking-self-anchor |
| (with-tracked-cluster [cluster] |
| (let [[feeder checker] (ack-tracking-feeder ["num"]) |
| tracked (mk-tracked-topology |
| cluster |
| (topology |
| {"1" (spout-spec feeder)} |
| {"2" (bolt-spec {"1" :shuffle} dup-anchor) |
| "3" (bolt-spec {"2" :shuffle} ack-bolt)}))] |
| (submit-local-topology (:nimbus cluster) |
| "test" |
| {} |
| (:topology tracked)) |
| (.feed feeder [1]) |
| (tracked-wait tracked 1) |
| (checker 1) |
| (.feed feeder [1]) |
| (.feed feeder [1]) |
| (.feed feeder [1]) |
| (tracked-wait tracked 3) |
| (checker 3) |
| ))) |
| |
| ;; (defspout ConstantSpout ["val"] {:prepare false} |
| ;; [collector] |
| ;; (Time/sleep 100) |
| ;; (emit-spout! collector [1])) |
| |
| ;; (def errored (atom false)) |
| ;; (def restarted (atom false)) |
| |
| ;; (defbolt local-error-checker {} [tuple collector] |
| ;; (when-not @errored |
| ;; (reset! errored true) |
| ;; (println "erroring") |
| ;; (throw (RuntimeException.))) |
| ;; (when-not @restarted (println "restarted")) |
| ;; (reset! restarted true)) |
| |
| ;; (deftest test-no-halt-local-mode |
| ;; (with-simulated-time-local-cluster [cluster] |
| ;; (let [topology (topology |
| ;; {1 (spout-spec ConstantSpout)} |
| ;; {2 (bolt-spec {1 :shuffle} local-error-checker) |
| ;; })] |
| ;; (submit-local-topology (:nimbus cluster) |
| ;; "test" |
| ;; {} |
| ;; topology) |
| ;; (while (not @restarted) |
| ;; (advance-time-ms! 100)) |
| ;; ))) |
| |
| (defspout IncSpout ["word"] |
| [conf context collector] |
| (let [state (atom 0)] |
| (spout |
| (nextTuple [] |
| (Thread/sleep 100) |
| (emit-spout! collector [@state] :id 1) |
| ) |
| (ack [id] |
| (swap! state inc)) |
| ))) |
| |
| |
| (defspout IncSpout2 ["word"] {:params [prefix]} |
| [conf context collector] |
| (let [state (atom 0)] |
| (spout |
| (nextTuple [] |
| (Thread/sleep 100) |
| (swap! state inc) |
| (emit-spout! collector [(str prefix "-" @state)]) |
| ) |
| ))) |
| |
| ;; (deftest test-clojure-spout |
| ;; (with-local-cluster [cluster] |
| ;; (let [nimbus (:nimbus cluster) |
| ;; top (topology |
| ;; {1 (spout-spec IncSpout)} |
| ;; {} |
| ;; )] |
| ;; (submit-local-topology nimbus |
| ;; "spout-test" |
| ;; {TOPOLOGY-DEBUG true |
| ;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3} |
| ;; top) |
| ;; (Thread/sleep 10000) |
| ;; (.killTopology nimbus "spout-test") |
| ;; (Thread/sleep 10000) |
| ;; ))) |
| |
| |
| (deftest test-component-specific-config |
| (with-simulated-time-local-cluster [cluster |
| :daemon-conf {TOPOLOGY-OPTIMIZE false |
| TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}] |
| (letlocals |
| (bind builder (TopologyBuilder.)) |
| (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"]))) |
| (-> builder |
| (.setBolt "2" |
| (TestConfBolt. |
| {"fake.config" 123 |
| TOPOLOGY-MAX-TASK-PARALLELISM 20 |
| TOPOLOGY-MAX-SPOUT-PENDING 30 |
| TOPOLOGY-OPTIMIZE true |
| TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"} |
| {"fake.type2" "a.serializer"}] |
| })) |
| (.shuffleGrouping "1") |
| (.setMaxTaskParallelism 2) |
| (.addConfiguration "fake.config2" 987) |
| ) |
| |
| |
| (bind results |
| (complete-topology cluster |
| (.createTopology builder) |
| :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]} |
| :mock-sources {"1" [["fake.config"] |
| [TOPOLOGY-MAX-TASK-PARALLELISM] |
| [TOPOLOGY-MAX-SPOUT-PENDING] |
| [TOPOLOGY-OPTIMIZE] |
| ["fake.config2"] |
| [TOPOLOGY-KRYO-REGISTER] |
| ]})) |
| (is (= {"fake.config" 123 |
| "fake.config2" 987 |
| TOPOLOGY-MAX-TASK-PARALLELISM 2 |
| TOPOLOGY-MAX-SPOUT-PENDING 30 |
| TOPOLOGY-OPTIMIZE false |
| TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer" |
| "fake.type2" "a.serializer" |
| "fake.type3" "a.serializer3"}} |
| (->> (read-tuples results "2") |
| (apply concat) |
| (apply hash-map)) |
| )) |
| ))) |
| |
| (defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf conf} |
| [conf context collector] |
| (bolt |
| (execute [tuple] |
| (let [name (.getValue tuple 0) |
| val (if (= name "!MAX_MSG_TIMEOUT") (.maxTopologyMessageTimeout context) (get conf name))] |
| (emit-bolt! collector [name val] :anchor tuple) |
| (ack! collector tuple)) |
| ))) |
| |
| (deftest test-component-specific-config-clojure |
| (with-simulated-time-local-cluster [cluster] |
| (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])) :conf {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40}) |
| } |
| {"2" (bolt-spec {"1" :shuffle} |
| (conf-query-bolt {"fake.config" 1 |
| TOPOLOGY-MAX-TASK-PARALLELISM 2 |
| TOPOLOGY-MAX-SPOUT-PENDING 10}) |
| :conf {TOPOLOGY-MAX-SPOUT-PENDING 3}) |
| }) |
| results (complete-topology cluster |
| topology |
| :topology-name "test123" |
| :storm-conf {TOPOLOGY-MAX-TASK-PARALLELISM 10 |
| TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} |
| :mock-sources {"1" [["fake.config"] |
| [TOPOLOGY-MAX-TASK-PARALLELISM] |
| [TOPOLOGY-MAX-SPOUT-PENDING] |
| ["!MAX_MSG_TIMEOUT"] |
| [TOPOLOGY-NAME] |
| ]})] |
| (is (= {"fake.config" 1 |
| TOPOLOGY-MAX-TASK-PARALLELISM 2 |
| TOPOLOGY-MAX-SPOUT-PENDING 3 |
| "!MAX_MSG_TIMEOUT" 40 |
| TOPOLOGY-NAME "test123"} |
| (->> (read-tuples results "2") |
| (apply concat) |
| (apply hash-map)) |
| ))))) |
| |
| (defbolt hooks-bolt ["emit" "ack" "fail"] {:prepare true} |
| [conf context collector] |
| (let [acked (atom 0) |
| failed (atom 0) |
| emitted (atom 0)] |
| (.addTaskHook context |
| (reify backtype.storm.hooks.ITaskHook |
| (prepare [this conf context] |
| ) |
| (cleanup [this] |
| ) |
| (emit [this info] |
| (swap! emitted inc)) |
| (boltAck [this info] |
| (swap! acked inc)) |
| (boltFail [this info] |
| (swap! failed inc)))) |
| (bolt |
| (execute [tuple] |
| (emit-bolt! collector [@emitted @acked @failed]) |
| (if (= 0 (- @acked @failed)) |
| (ack! collector tuple) |
| (fail! collector tuple)) |
| )))) |
| |
| (deftest test-hooks |
| (with-simulated-time-local-cluster [cluster] |
| (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"]))) |
| } |
| {"2" (bolt-spec {"1" :shuffle} |
| hooks-bolt) |
| }) |
| results (complete-topology cluster |
| topology |
| :mock-sources {"1" [[1] |
| [1] |
| [1] |
| [1] |
| ]})] |
| (is (= [[0 0 0] |
| [2 1 0] |
| [4 1 1] |
| [6 2 1]] |
| (read-tuples results "2") |
| ))))) |
| |
| (deftest test-acking-branching-complex |
| ;; test acking with branching in the topology |
| ) |
| |
| |
| (deftest test-fields-grouping |
| ;; 1. put a shitload of random tuples through it and test that counts are right |
| ;; 2. test that different spouts with different phints group the same way |
| ) |
| |
| (deftest test-all-grouping |
| ) |
| |
| (deftest test-direct-grouping |
| ) |