blob: 2bfa066573171bd873572dc655bb3004b070f857 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.grouping-test
(:use [clojure test])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
[backtype.storm.generated JavaObject JavaObjectArg])
(:use [backtype.storm testing clojure])
(:use [backtype.storm.daemon common])
(:require [backtype.storm [thrift :as thrift]]))
(deftest test-shuffle
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
{"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
:parallelism-hint 6)
})
results (complete-topology cluster
topology
;; important for test that
;; #tuples = multiple of 4 and 6
:mock-sources {"1" (->> [["a"] ["b"]]
(repeat 12)
(apply concat))})]
(is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
(read-tuples results "2")))
)))
(deftest test-field
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [spout-phint 4
bolt-phint 6
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true)
:parallelism-hint spout-phint)}
{"2" (thrift/mk-bolt-spec {"1" ["word"]}
(TestWordBytesCounter.)
:parallelism-hint bolt-phint)
})
results (complete-topology
cluster
topology
:mock-sources {"1" (->> [[(.getBytes "a")]
[(.getBytes "b")]]
(repeat (* spout-phint bolt-phint))
(apply concat))})]
(is (ms= (apply concat
(for [value '("a" "b")
sum (range 1 (inc (* spout-phint bolt-phint)))]
[[value sum]]))
(read-tuples results "2"))))))
(defbolt id-bolt ["val"] [tuple collector]
(emit-bolt! collector (.getValues tuple))
(ack! collector tuple))
(deftest test-custom-groupings
(with-simulated-time-local-cluster [cluster]
(let [topology (topology
{"1" (spout-spec (TestWordSpout. true))}
{"2" (bolt-spec {"1" (NGrouping. 2)}
id-bolt
:p 4)
"3" (bolt-spec {"1" (JavaObject. "backtype.storm.testing.NGrouping"
[(JavaObjectArg/int_arg 3)])}
id-bolt
:p 6)
})
results (complete-topology cluster
topology
:mock-sources {"1" [["a"]
["b"]
]}
)]
(is (ms= [["a"] ["a"] ["b"] ["b"]]
(read-tuples results "2")))
(is (ms= [["a"] ["a"] ["a"] ["b"] ["b"] ["b"]]
(read-tuples results "3")))
)))