blob: dba9c5cf4abd9ffcb9d6817b30502ad332710ba3 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.drpc-test
(:use [clojure test])
(:import [org.apache.storm.drpc ReturnResults DRPCSpout
LinearDRPCTopologyBuilder])
(:import [org.apache.storm.topology FailedException])
(:import [org.apache.storm.coordination CoordinatedBolt$FinishedCallback])
(:import [org.apache.storm LocalDRPC LocalCluster])
(:import [org.apache.storm.tuple Fields])
(:import [org.apache.storm.generated DRPCExecutionException])
(:import [java.util.concurrent ConcurrentLinkedQueue])
(:use [org.apache.storm config testing clojure])
(:use [org.apache.storm.daemon common drpc])
(:use [conjure core]))
(defbolt exclamation-bolt ["result" "return-info"] [tuple collector]
(emit-bolt! collector
[(str (.getString tuple 0) "!!!") (.getValue tuple 1)]
:anchor tuple)
(ack! collector tuple)
)
(deftest test-drpc-flow
(let [drpc (LocalDRPC.)
spout (DRPCSpout. "test" drpc)
cluster (LocalCluster.)
topology (topology
{"1" (spout-spec spout)}
{"2" (bolt-spec {"1" :shuffle}
exclamation-bolt)
"3" (bolt-spec {"2" :shuffle}
(ReturnResults.))})]
(.submitTopology cluster "test" {} topology)
(is (= "aaa!!!" (.execute drpc "test" "aaa")))
(is (= "b!!!" (.execute drpc "test" "b")))
(is (= "c!!!" (.execute drpc "test" "c")))
(.shutdown cluster)
(.shutdown drpc)
))
(defbolt exclamation-bolt-drpc ["id" "result"] [tuple collector]
(emit-bolt! collector
[(.getValue tuple 0) (str (.getString tuple 1) "!!!")]
:anchor tuple)
(ack! collector tuple)
)
(deftest test-drpc-builder
(let [drpc (LocalDRPC.)
cluster (LocalCluster.)
builder (LinearDRPCTopologyBuilder. "test")
]
(.addBolt builder exclamation-bolt-drpc 3)
(.submitTopology cluster
"builder-test"
{}
(.createLocalTopology builder drpc))
(is (= "aaa!!!" (.execute drpc "test" "aaa")))
(is (= "b!!!" (.execute drpc "test" "b")))
(is (= "c!!!" (.execute drpc "test" "c")))
(.shutdown cluster)
(.shutdown drpc)
))
(defn safe-inc [v]
(if v (inc v) 1))
(defbolt partial-count ["request" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [id (.getValue tuple 0)]
(swap! counts update-in [id] safe-inc)
(ack! collector tuple)
))
CoordinatedBolt$FinishedCallback
(finishedId [this id]
(emit-bolt! collector [id (get @counts id 0)])
))
))
(defn safe+ [v1 v2]
(if v1 (+ v1 v2) v2))
(defbolt count-aggregator ["request" "total"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [id (.getValue tuple 0)
count (.getValue tuple 1)]
(swap! counts update-in [id] safe+ count)
(ack! collector tuple)
))
CoordinatedBolt$FinishedCallback
(finishedId [this id]
(emit-bolt! collector [id (get @counts id 0)])
))
))
(defbolt create-tuples ["request"] [tuple collector]
(let [id (.getValue tuple 0)
amt (Integer/parseInt (.getValue tuple 1))]
(doseq [i (range (* amt amt))]
(emit-bolt! collector [id] :anchor tuple))
(ack! collector tuple)
))
(deftest test-drpc-coordination
(let [drpc (LocalDRPC.)
cluster (LocalCluster.)
builder (LinearDRPCTopologyBuilder. "square")
]
(.addBolt builder create-tuples 3)
(doto (.addBolt builder partial-count 3)
(.shuffleGrouping))
(doto (.addBolt builder count-aggregator 3)
(.fieldsGrouping (Fields. ["request"])))
(.submitTopology cluster
"squared"
{}
(.createLocalTopology builder drpc))
(is (= "4" (.execute drpc "square" "2")))
(is (= "100" (.execute drpc "square" "10")))
(is (= "1" (.execute drpc "square" "1")))
(is (= "0" (.execute drpc "square" "0")))
(.shutdown cluster)
(.shutdown drpc)
))
(defbolt id-bolt ["request" "val"] [tuple collector]
(emit-bolt! collector
(.getValues tuple)
:anchor tuple)
(ack! collector tuple))
(defbolt emit-finish ["request" "result"] {:prepare true}
[conf context collector]
(bolt
(execute [tuple]
(ack! collector tuple)
)
CoordinatedBolt$FinishedCallback
(finishedId [this id]
(emit-bolt! collector [id "done"])
)))
(deftest test-drpc-coordination-tricky
(let [drpc (LocalDRPC.)
cluster (LocalCluster.)
builder (LinearDRPCTopologyBuilder. "tricky")
]
(.addBolt builder id-bolt 3)
(doto (.addBolt builder id-bolt 3)
(.shuffleGrouping))
(doto (.addBolt builder emit-finish 3)
(.fieldsGrouping (Fields. ["request"])))
(.submitTopology cluster
"tricky"
{}
(.createLocalTopology builder drpc))
(is (= "done" (.execute drpc "tricky" "2")))
(is (= "done" (.execute drpc "tricky" "3")))
(is (= "done" (.execute drpc "tricky" "4")))
(.shutdown cluster)
(.shutdown drpc)
))
(defbolt fail-finish-bolt ["request" "result"] {:prepare true}
[conf context collector]
(bolt
(execute [tuple]
(ack! collector tuple))
CoordinatedBolt$FinishedCallback
(finishedId [this id]
(throw (FailedException.))
)))
(deftest test-drpc-fail-finish
(let [drpc (LocalDRPC.)
cluster (LocalCluster.)
builder (LinearDRPCTopologyBuilder. "fail2")
]
(.addBolt builder fail-finish-bolt 3)
(.submitTopology cluster
"fail2"
{}
(.createLocalTopology builder drpc))
(is (thrown? DRPCExecutionException (.execute drpc "fail2" "2")))
(.shutdown cluster)
(.shutdown drpc)
))
(deftest test-dequeue-req-after-timeout
(let [queue (ConcurrentLinkedQueue.)
delay-seconds 2
conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}]
(stubbing [acquire-queue queue
read-storm-config conf]
(let [drpc-handler (service-handler conf)]
(is (thrown? DRPCExecutionException
(.execute drpc-handler "ArbitraryDRPCFunctionName" "")))
(is (= 0 (.size queue)))))))
(deftest test-drpc-timeout-cleanup
(let [queue (ConcurrentLinkedQueue.)
delay-seconds 1
conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}]
(stubbing [acquire-queue queue
read-storm-config conf
timeout-check-secs delay-seconds]
(let [drpc-handler (service-handler conf)]
(is (thrown? DRPCExecutionException
(.execute drpc-handler "ArbitraryDRPCFunctionName" "no-args")))))))