blob: 745678cac2346204b6c0bf9d775f75bdf3f71b5d [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.starter.clj.bolts-test
(:require [clojure.test :refer :all]
[org.apache.storm.starter.clj.word-count :refer [word-count split-sentence]]
[org.apache.storm.starter.clj.exclamation :refer [exclamation-bolt]]
[org.apache.storm.starter.clj.bolts :refer
[rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]]
[org.apache.storm [testing :refer :all]])
(:import [org.apache.storm Constants Testing]
[org.apache.storm.testing MkTupleParam]
[org.apache.storm.task OutputCollector IOutputCollector]
[org.apache.storm.starter.tools Rankable]
[org.apache.storm.tuple Tuple]
[java.util ArrayList]))
(defn execute-tuples [bolt tuples]
(let [out (atom [])]
(.prepare bolt {} nil (OutputCollector.
(reify IOutputCollector
(emit [_ _ _ tuple]
(swap! out conj tuple))
(ack [_ input]))))
(if (vector? tuples)
(doseq [t tuples]
(.execute bolt t))
(.execute bolt tuples))
@out))
(defn- mock-tuple [m & {component :component stream-id :stream-id
:or {component "1" stream-id "1"}}]
(let [param (MkTupleParam.)]
(.setStream param stream-id)
(.setComponent param component)
(.setFieldsList param (ArrayList. (.keySet m)))
(Testing/testTuple (ArrayList. (.values m)) param)))
(def ^{:private true} tick-tuple
(mock-tuple {}
:component Constants/SYSTEM_COMPONENT_ID
:stream-id Constants/SYSTEM_TICK_STREAM_ID))
(deftest test-split-sentence
(testing "Bolt emits word per sentence"
(let [tuples (execute-tuples
split-sentence
(mock-tuple {"sentence" "the cat jumped over the door"}))]
(is (= [["the"] ["cat"] ["jumped"] ["over"] ["the"] ["door"]] tuples)))))
(deftest test-word-count
(testing "Bolt emits new count"
(let [tuples (execute-tuples word-count [(mock-tuple {"word" "the"})
(mock-tuple {"word" "the"})
(mock-tuple {"word" "cat"})])]
(is (ms= [["the" 1] ["the" 2] ["cat" 1]] tuples)))))
(deftest test-exclamation-bolt
(testing "Bolt emits word with exclamation marks"
(let [tuples (execute-tuples exclamation-bolt (mock-tuple {"word" "nathan"}))]
(is (= [["nathan!!!"]] tuples)))))
(deftest test-rolling-bolt
(testing "Emits nothing if no object has been counted"
(let [tuples (execute-tuples (rolling-count-bolt 9 3) tick-tuple)]
(is (empty? tuples))))
(testing "Emits something if object was counted"
(let [tuples (execute-tuples (rolling-count-bolt 9 3)
[(mock-tuple {"word" "nathan"}) tick-tuple])]
(is (= [["nathan" 1 0]] tuples)))))
(deftest test-intermediate-rankings-bolt
(testing "Emits rankings for tick tuple"
(let [tuples (execute-tuples (intermediate-rankings-bolt 5 2) tick-tuple)]
(is (seq tuples))))
(testing "Emits nothing for normal tuple"
(let [tuples (execute-tuples (intermediate-rankings-bolt 5 2)
(mock-tuple {"obj" "nathan" "count" 1}))]
(is (empty? tuples)))))
(defn- mock-rankable [object count]
"Creates rankable with object and count"
(reify Rankable
(getCount [_] count)
(getObject [_] object)))
(deftest test-total-rankings-bolt
(testing "Emits rankings for tick tuple"
(let [tuples (execute-tuples (total-rankings-bolt 5 2) tick-tuple)]
(is (seq tuples))))
(testing "Emits nothing for normal tuple"
(let [tuples (execute-tuples (total-rankings-bolt 5 2)
(mock-tuple {"rankings" (mock-rankable "nathan" 2)}))]
(is (empty? tuples)))))