blob: c216008e36841e6cd079551e22072389b97ed8bb [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.grouping-test
(:use [clojure test])
(:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
[org.apache.storm.generated JavaObject JavaObjectArg Grouping NullStruct])
(:import [org.apache.storm.grouping LoadMapping])
(:use [org.apache.storm log config])
(:use [org.apache.storm clojure])
(:import [org.apache.storm LocalCluster$Builder Testing Thrift])
(:import [org.apache.storm.utils Utils]
(org.apache.storm.daemon GrouperFactory)))
(def shuffle-grouping (Grouping/shuffle (NullStruct. )))
(deftest test-shuffle
(let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true})
num-messages 100000
min-prcnt (int (* num-messages 0.49))
max-prcnt (int (* num-messages 0.51))
data [1 2]
freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data)))
load1 (.get freq [(int 1)])
load2 (.get freq [(int 2)])]
(log-message "FREQ:" freq)
(is (>= load1 min-prcnt))
(is (<= load1 max-prcnt))
(is (>= load2 min-prcnt))
(is (<= load2 max-prcnt))))
(deftest test-field
(with-open [cluster (.build (doto (LocalCluster$Builder.)
(.withSimulatedTime)
(.withSupervisors 4)))]
(let [spout-phint 4
bolt-phint 6
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestWordSpout. true) (Integer. spout-phint))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareFieldsGrouping ["word"])}
(TestWordBytesCounter.) (Integer. spout-phint))
})
results (Testing/completeTopology
cluster
topology
(doto (CompleteTopologyParam.)
(.setMockedSources (MockedSources. {"1" (->> [[(.getBytes "a")]
[(.getBytes "b")]]
(repeat (* spout-phint bolt-phint))
(apply concat))}))))]
(is (Testing/multiseteq (apply concat
(for [value '("a" "b")
sum (range 1 (inc (* spout-phint bolt-phint)))]
[[value (int sum)]]))
(Testing/readTuples results "2"))))))
(deftest test-field
(with-open [cluster (.build (doto (LocalCluster$Builder.)
(.withSimulatedTime)
(.withSupervisors 4)))]
(let [spout-phint 4
bolt-phint 6
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestWordSpout. true) (Integer. spout-phint))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareFieldsGrouping ["word"])}
(TestWordBytesCounter.) (Integer. bolt-phint))
})
results (Testing/completeTopology
cluster
topology
(doto (CompleteTopologyParam.)
(.setMockedSources (MockedSources. {"1" (->> [[(.getBytes "a")]
[(.getBytes "b")]]
(repeat (* spout-phint bolt-phint))
(apply concat))}))))]
(is (Testing/multiseteq (apply concat
(for [value '("a" "b")
sum (range 1 (inc (* spout-phint bolt-phint)))]
[[value (int sum)]]))
(Testing/readTuples results "2"))))))
(defbolt id-bolt ["val"] [tuple collector]
(emit-bolt! collector (.getValues tuple))
(ack! collector tuple))
(deftest test-custom-groupings
(with-open [cluster (.build (doto (LocalCluster$Builder.)
(.withSimulatedTime)
(.withSupervisors 4)))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestWordSpout. true))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareCustomStreamGrouping (NGrouping. (Integer. 2)))}
id-bolt
(Integer. 4))
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareCustomJavaObjectGrouping
(JavaObject. "org.apache.storm.testing.NGrouping"
[(JavaObjectArg/int_arg 3)]))}
id-bolt
(Integer. 6))
})
results (Testing/completeTopology cluster
topology
(doto (CompleteTopologyParam.)
(.setMockedSources (MockedSources. {"1" [["a"]
["b"]
]}
))))]
(is (Testing/multiseteq [["a"] ["a"] ["b"] ["b"]]
(Testing/readTuples results "2")))
(is (Testing/multiseteq [["a"] ["a"] ["a"] ["b"] ["b"] ["b"]]
(Testing/readTuples results "3")))
)))