| (ns backtype.storm.integration-test |
| (:use [clojure test]) |
| (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter]) |
| (:use [backtype.storm bootstrap testing]) |
| (:use [backtype.storm.daemon common]) |
| ) |
| |
| (bootstrap) |
| |
| ;; (deftest test-counter |
| ;; (with-local-cluster [cluster :supervisors 4] |
| ;; (let [state (:storm-cluster-state cluster) |
| ;; nimbus (:nimbus cluster) |
| ;; topology (thrift/mk-topology |
| ;; {1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} |
| ;; {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4) |
| ;; 3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.)) |
| ;; 4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.)) |
| ;; })] |
| ;; (submit-local-topology nimbus |
| ;; "counter" |
| ;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} |
| ;; topology) |
| ;; (Thread/sleep 10000) |
| ;; (.killTopology nimbus "counter") |
| ;; (Thread/sleep 10000) |
| ;; ))) |
| |
| ;; (deftest test-multilang-fy |
| ;; (with-local-cluster [cluster :supervisors 4] |
| ;; (let [nimbus (:nimbus cluster) |
| ;; topology (thrift/mk-topology |
| ;; {1 (thrift/mk-spout-spec (TestWordSpout. false))} |
| ;; {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "fancy" "tester.fy" ["word"] :parallelism-hint 1)} |
| ;; )] |
| ;; (submit-local-topology nimbus |
| ;; "test" |
| ;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} |
| ;; topology) |
| ;; (Thread/sleep 10000) |
| ;; (.killTopology nimbus "test") |
| ;; (Thread/sleep 10000) |
| ;; ))) |
| |
| ;; (deftest test-multilang-rb |
| ;; (with-local-cluster [cluster :supervisors 4] |
| ;; (let [nimbus (:nimbus cluster) |
| ;; topology (thrift/mk-topology |
| ;; {1 (thrift/mk-spout-spec (TestWordSpout. false))} |
| ;; {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "ruby" "tester.rb" ["word"] :parallelism-hint 1)} |
| ;; )] |
| ;; (submit-local-topology nimbus |
| ;; "test" |
| ;; {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} |
| ;; topology) |
| ;; (Thread/sleep 10000) |
| ;; (.killTopology nimbus "test") |
| ;; (Thread/sleep 10000) |
| ;; ))) |
| |
| |
| (deftest test-multilang-py |
| (with-local-cluster [cluster :supervisors 4] |
| (let [nimbus (:nimbus cluster) |
| topology (thrift/mk-topology |
| {1 (thrift/mk-spout-spec (TestWordSpout. false))} |
| {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "python" "tester.py" ["word"] :parallelism-hint 1)} |
| )] |
| (submit-local-topology nimbus |
| "test" |
| {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} |
| topology) |
| (Thread/sleep 10000) |
| (.killTopology nimbus "test") |
| (Thread/sleep 10000) |
| ))) |
| |
| |
| (deftest test-basic-topology |
| (doseq [zmq-on? [true false]] |
| (with-simulated-time-local-cluster [cluster :supervisors 4 |
| :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}] |
| (let [topology (thrift/mk-topology |
| {1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} |
| {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4) |
| 3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.)) |
| 4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.)) |
| }) |
| results (complete-topology cluster |
| topology |
| :mock-sources {1 [["nathan"] ["bob"] ["joey"] ["nathan"]]} |
| :storm-conf {TOPOLOGY-DEBUG true |
| TOPOLOGY-WORKERS 2})] |
| (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]] |
| (read-tuples results 1))) |
| (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]] |
| (read-tuples results 2))) |
| (is (= [[1] [2] [3] [4]] |
| (read-tuples results 3))) |
| (is (= [[1] [2] [3] [4]] |
| (read-tuples results 4))) |
| )))) |
| |
| (deftest test-shuffle |
| (with-simulated-time-local-cluster [cluster :supervisors 4] |
| (let [topology (thrift/mk-topology |
| {1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)} |
| {2 (thrift/mk-bolt-spec {1 :shuffle} (TestGlobalCount.) |
| :parallelism-hint 6) |
| }) |
| results (complete-topology cluster |
| topology |
| ;; important for test that |
| ;; #tuples = multiple of 4 and 6 |
| :mock-sources {1 [["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ["a"] ["b"] |
| ]} |
| )] |
| (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]])) |
| (read-tuples results 2))) |
| ))) |
| |
| (defbolt lalala-bolt1 ["word"] [tuple collector] |
| (let [ret (-> (.getValue tuple 0) (str "lalala"))] |
| (.emit collector tuple [ret]) |
| (.ack collector tuple) |
| )) |
| |
| (defboltfull lalala-bolt2 ["word"] |
| :let [state (atom nil)] |
| :prepare ([conf context collector] |
| (println "prepare") |
| (reset! state "lalala") |
| ) |
| :execute ([tuple collector] |
| (let [ret (-> (.getValue tuple 0) (str @state))] |
| (.emit collector tuple [ret]) |
| (.ack collector tuple) |
| ))) |
| |
| (defboltfull lalala-bolt3 ["word"] |
| :let [state (atom nil)] |
| :params [prefix] |
| :prepare ([conf context collector] |
| (reset! state (str prefix "lalala")) |
| ) |
| :execute ([tuple collector] |
| (let [ret (-> (.getValue tuple 0) (str @state))] |
| (.emit collector tuple [ret]) |
| (.ack collector tuple) |
| ))) |
| |
| (deftest test-clojure-bolt |
| (with-simulated-time-local-cluster [cluster :supervisors 4] |
| (let [nimbus (:nimbus cluster) |
| topology (thrift/mk-topology |
| {1 (thrift/mk-spout-spec (TestWordSpout. false))} |
| {2 (thrift/mk-bolt-spec {1 :shuffle} |
| lalala-bolt1) |
| 3 (thrift/mk-bolt-spec {1 :shuffle} |
| lalala-bolt2) |
| 4 (thrift/mk-bolt-spec {1 :shuffle} |
| (lalala-bolt3 "_nathan_"))} |
| ) |
| results (complete-topology cluster |
| topology |
| :mock-sources {1 [["david"] |
| ["adam"] |
| ]} |
| )] |
| (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results 2))) |
| (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results 3))) |
| (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results 4))) |
| ))) |
| |
| (defn ack-tracking-feeder [fields] |
| (let [tracker (AckTracker.)] |
| [(doto (feeder-spout fields) |
| (.setAckFailDelegate tracker)) |
| (fn [val] |
| (is (= (.getNumAcks tracker) val)) |
| (.resetNumAcks tracker) |
| )] |
| )) |
| |
| (defboltfull branching-bolt ["num"] |
| :params [amt] |
| :execute ([tuple collector] |
| (doseq [i (range amt)] |
| (.emit collector tuple [i])) |
| (.ack collector tuple) |
| )) |
| |
| (defboltfull agg-bolt ["num"] |
| :let [seen (atom [])] |
| :params [amt] |
| :execute ([tuple collector] |
| (swap! seen conj tuple) |
| (when (= (count @seen) amt) |
| (.emit collector @seen [1]) |
| (doseq [s @seen] |
| (.ack collector s)) |
| (reset! seen []) |
| ))) |
| |
| (defbolt ack-bolt {} |
| [tuple collector] |
| (.ack collector tuple)) |
| |
| (defbolt identity-bolt ["num"] |
| [tuple collector] |
| (.emit collector tuple (.getValues tuple)) |
| (.ack collector tuple)) |
| |
| (deftest test-acking |
| (with-tracked-cluster [cluster] |
| (let [[feeder1 checker1] (ack-tracking-feeder ["num"]) |
| [feeder2 checker2] (ack-tracking-feeder ["num"]) |
| [feeder3 checker3] (ack-tracking-feeder ["num"]) |
| tracked (mk-tracked-topology |
| {1 [feeder1] |
| 2 [feeder2] |
| 3 [feeder3]} |
| {4 [{1 :shuffle} (branching-bolt 2)] |
| 5 [{2 :shuffle} (branching-bolt 4)] |
| 6 [{3 :shuffle} (branching-bolt 1)] |
| 7 [{4 :shuffle |
| 5 :shuffle |
| 6 :shuffle} (agg-bolt 3)] |
| 8 [{7 :shuffle} (branching-bolt 2)] |
| 9 [{8 :shuffle} ack-bolt]} |
| )] |
| (submit-local-topology (:nimbus cluster) |
| "test" |
| {} |
| (:topology tracked)) |
| (.feed feeder1 [1]) |
| (tracked-wait tracked 1) |
| (checker1 0) |
| (.feed feeder2 [1]) |
| (tracked-wait tracked 1) |
| (checker1 1) |
| (checker2 1) |
| (.feed feeder1 [1]) |
| (tracked-wait tracked 1) |
| (checker1 0) |
| (.feed feeder1 [1]) |
| (tracked-wait tracked 1) |
| (checker1 1) |
| (.feed feeder3 [1]) |
| (tracked-wait tracked 1) |
| (checker1 0) |
| (checker3 0) |
| (.feed feeder2 [1]) |
| (tracked-wait tracked 1) |
| (checker1 1) |
| (checker2 1) |
| (checker3 1) |
| |
| ))) |
| |
| (deftest test-ack-branching |
| (with-tracked-cluster [cluster] |
| (let [[feeder checker] (ack-tracking-feeder ["num"]) |
| tracked (mk-tracked-topology |
| {1 [feeder]} |
| {2 [{1 :shuffle} identity-bolt] |
| 3 [{1 :shuffle} identity-bolt] |
| 4 [{2 :shuffle |
| 3 :shuffle} (agg-bolt 4)]})] |
| (submit-local-topology (:nimbus cluster) |
| "test" |
| {} |
| (:topology tracked)) |
| (.feed feeder [1]) |
| (tracked-wait tracked 1) |
| (checker 0) |
| (.feed feeder [1]) |
| (tracked-wait tracked 1) |
| (checker 2) |
| ))) |
| |
| (defbolt dup-anchor ["num"] |
| [tuple collector] |
| (.emit collector [tuple tuple] [1]) |
| (.ack collector tuple)) |
| |
| (deftest test-acking-self-anchor |
| (with-tracked-cluster [cluster] |
| (let [[feeder checker] (ack-tracking-feeder ["num"]) |
| tracked (mk-tracked-topology |
| {1 [feeder]} |
| {2 [{1 :shuffle} dup-anchor] |
| 3 [{2 :shuffle} ack-bolt]})] |
| (submit-local-topology (:nimbus cluster) |
| "test" |
| {} |
| (:topology tracked)) |
| (.feed feeder [1]) |
| (tracked-wait tracked 1) |
| (checker 1) |
| (.feed feeder [1]) |
| (.feed feeder [1]) |
| (.feed feeder [1]) |
| (tracked-wait tracked 3) |
| (checker 3) |
| ))) |
| |
| (deftest test-acking-branching-complex |
| ;; test acking with branching in the topology |
| ) |
| |
| |
| (deftest test-fields-grouping |
| ;; 1. put a shitload of random tuples through it and test that counts are right |
| ;; 2. test that different spouts with different phints group the same way |
| ) |
| |
| (deftest test-all-grouping |
| ) |
| |
| (deftest test-direct-grouping |
| ) |