blob: 3c759b3727ca51ba8bd85068a636e903d14f35e5 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.subtopology-test
(:use [clojure test])
(:import [org.apache.storm.topology TopologyBuilder])
(:import [org.apache.storm.testing TestWordSpout PrepareBatchBolt BatchRepeatA BatchProcessWord BatchNumberList])
(:import [org.apache.storm.coordination BatchSubtopologyBuilder])
(:use [org.apache.storm testing])
(:use [org.apache.storm.daemon common]))
;; todo: need to configure coordinatedbolt with streams that aren't subscribed to, should auto-anchor those to the final
;; coordination tuple... find all streams that aren't subscribed to
;; having trouble with this test, commenting for now
;; (deftest test-batch-subtopology
;; (with-local-cluster [cluster :supervisors 4]
;; (letlocals
;; (bind builder (TopologyBuilder.))
;; (.setSpout builder "spout" (TestWordSpout.))
;; (-> (.setBolt builder "identity" (PrepareBatchBolt. (Fields. ["id" "word"])) 3)
;; (.shuffleGrouping "spout")
;; )
;; (bind batch-builder (BatchSubtopologyBuilder. "for-a" (BatchRepeatA.) 2))
;; (-> (.getMasterDeclarer batch-builder)
;; (.shuffleGrouping "identity"))
;; (-> (.setBolt batch-builder "process" (BatchProcessWord.) 2)
;; (.fieldsGrouping "for-a" "multi" (Fields. ["id"])))
;; (-> (.setBolt batch-builder "joiner" (BatchNumberList. "for-a") 2)
;; (.fieldsGrouping "process" (Fields. ["id"]))
;; (.fieldsGrouping "for-a" "single" (Fields. ["id"]))
;; )
;;
;; (.extendTopology batch-builder builder)
;;
;; (bind results (complete-topology cluster
;; (.createTopology builder)
;; :storm-conf {TOPOLOGY-DEBUG true}
;; :mock-sources {"spout" [
;; ["ccacccaa"]
;; ["bbb"]
;; ["ba"]
;; ]}
;; ))
;; (is (ms= [
;; ["ccacccaa" [2 6 7]]
;; ["bbb" []]
;; ["ba" [1]]
;; ]
;; (read-tuples results "joiner")))
;; )))