blob: 7c8ec17b1881f999cff59d5024e3b4c2308957a5 [file] [log] [blame]
(ns backtype.storm.subtopology-test
(:use [clojure test])
(:import [backtype.storm.topology TopologyBuilder])
(:import [backtype.storm.testing TestWordSpout PrepareBatchBolt BatchRepeatA BatchProcessWord BatchNumberList])
(:import [backtype.storm.coordination BatchSubtopologyBuilder])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
)
(bootstrap)
;; todo: need to configure coordinatedbolt with streams that aren't subscribed to, should auto-anchor those to the final
;; coordination tuple... find all streams that aren't subscribed to
;; having trouble with this test, commenting for now
;; (deftest test-batch-subtopology
;; (with-local-cluster [cluster :supervisors 4]
;; (letlocals
;; (bind builder (TopologyBuilder.))
;; (.setSpout builder "spout" (TestWordSpout.))
;; (-> (.setBolt builder "identity" (PrepareBatchBolt. (Fields. ["id" "word"])) 3)
;; (.shuffleGrouping "spout")
;; )
;; (bind batch-builder (BatchSubtopologyBuilder. "for-a" (BatchRepeatA.) 2))
;; (-> (.getMasterDeclarer batch-builder)
;; (.shuffleGrouping "identity"))
;; (-> (.setBolt batch-builder "process" (BatchProcessWord.) 2)
;; (.fieldsGrouping "for-a" "multi" (Fields. ["id"])))
;; (-> (.setBolt batch-builder "joiner" (BatchNumberList. "for-a") 2)
;; (.fieldsGrouping "process" (Fields. ["id"]))
;; (.fieldsGrouping "for-a" "single" (Fields. ["id"]))
;; )
;;
;; (.extendTopology batch-builder builder)
;;
;; (bind results (complete-topology cluster
;; (.createTopology builder)
;; :storm-conf {TOPOLOGY-DEBUG true}
;; :mock-sources {"spout" [
;; ["ccacccaa"]
;; ["bbb"]
;; ["ba"]
;; ]}
;; ))
;; (is (ms= [
;; ["ccacccaa" [2 6 7]]
;; ["bbb" []]
;; ["ba" [1]]
;; ]
;; (read-tuples results "joiner")))
;; )))