blob: 869e2a7384c1e0439aa528c543ac5a238292aee9 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.clojure-test
(:use [clojure test])
(:import [org.apache.storm.testing TestWordSpout TestPlannerSpout]
[org.apache.storm.tuple Fields])
(:use [org.apache.storm testing clojure config])
(:use [org.apache.storm.daemon common])
(:require [org.apache.storm [thrift :as thrift]])
(:import [org.apache.storm Thrift])
(:import [org.apache.storm.utils Utils]))
(defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
(let [ret (str val "lalala")]
(emit-bolt! collector [ret] :anchor tuple)
(ack! collector tuple)
))
(defbolt lalala-bolt2 ["word"] {:prepare true}
[conf context collector]
(let [state (atom nil)]
(reset! state "lalala")
(bolt
(execute [tuple]
(let [ret (-> (.getValue tuple 0) (str @state))]
(emit-bolt! collector [ret] :anchor tuple)
(ack! collector tuple)
))
)))
(defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]}
[conf context collector]
(let [state (atom nil)]
(bolt
(prepare [_ _ _]
(reset! state (str prefix "lalala")))
(execute [{val "word" :as tuple}]
(let [ret (-> (.getValue tuple 0) (str @state))]
(emit-bolt! collector [ret] :anchor tuple)
(ack! collector tuple)
)))
))
(deftest test-clojure-bolt
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [nimbus (:nimbus cluster)
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareShuffleGrouping)}
lalala-bolt1)
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareLocalOrShuffleGrouping)}
lalala-bolt2)
"4" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareShuffleGrouping)}
(lalala-bolt3 "_nathan_"))}
)
results (complete-topology cluster
topology
:mock-sources {"1" [["david"]
["adam"]
]}
)]
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
(is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
(is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4")))
)))
(defbolt punctuator-bolt ["word" "period" "question" "exclamation"]
[tuple collector]
(if (= (:word tuple) "bar")
(do
(emit-bolt! collector {:word "bar" :period "bar" :question "bar"
"exclamation" "bar"})
(ack! collector tuple))
(let [ res (assoc tuple :period (str (:word tuple) "."))
res (assoc res :exclamation (str (:word tuple) "!"))
res (assoc res :question (str (:word tuple) "?")) ]
(emit-bolt! collector res)
(ack! collector tuple))))
(deftest test-map-emit
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [topology (Thrift/buildTopology
{"words" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
{"out" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "words" nil)
(Thrift/prepareShuffleGrouping)}
punctuator-bolt)})
results (complete-topology cluster
topology
:mock-sources {"words" [["foo"] ["bar"]]}
)]
(is (ms= [["foo" "foo." "foo?" "foo!"]
["bar" "bar" "bar" "bar"]] (read-tuples results "out"))))))
(defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf conf}
[conf context collector]
(bolt
(execute [tuple]
(let [name (.getValue tuple 0)
val (if (= name "!MAX_MSG_TIMEOUT") (.maxTopologyMessageTimeout context) (get conf name))]
(emit-bolt! collector [name val] :anchor tuple)
(ack! collector tuple))
)))
(deftest test-component-specific-config-clojure
(with-simulated-time-local-cluster [cluster]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. (Fields. ["conf"]))
nil
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareShuffleGrouping)}
(conf-query-bolt {"fake.config" 1
TOPOLOGY-MAX-TASK-PARALLELISM 2
TOPOLOGY-MAX-SPOUT-PENDING 10})
nil
{TOPOLOGY-MAX-SPOUT-PENDING 3})})
results (complete-topology cluster
topology
:topology-name "test123"
:storm-conf {TOPOLOGY-MAX-TASK-PARALLELISM 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
:mock-sources {"1" [["fake.config"]
[TOPOLOGY-MAX-TASK-PARALLELISM]
[TOPOLOGY-MAX-SPOUT-PENDING]
["!MAX_MSG_TIMEOUT"]
[TOPOLOGY-NAME]
]})]
(is (= {"fake.config" 1
TOPOLOGY-MAX-TASK-PARALLELISM 2
TOPOLOGY-MAX-SPOUT-PENDING 3
"!MAX_MSG_TIMEOUT" 40
TOPOLOGY-NAME "test123"}
(->> (read-tuples results "2")
(apply concat)
(apply hash-map))
)))))