blob: 4a1b16f4c3278b52f6469e5a2dfbae57ac592eca [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.drpc-test
(:use [clojure test])
(:import [org.apache.storm.drpc ReturnResults DRPCSpout
LinearDRPCTopologyBuilder DRPCInvocationsClient]
[org.apache.storm.utils ServiceRegistry Utils])
(:import [org.apache.storm.topology FailedException])
(:import [org.apache.storm.coordination CoordinatedBolt$FinishedCallback])
(:import [org.apache.storm ILocalDRPC LocalDRPC LocalCluster])
(:import [org.apache.storm.tuple Fields])
(:import [org.mockito Mockito])
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.storm.spout SpoutOutputCollector])
(:import [org.apache.storm.generated DRPCExecutionException DRPCRequest])
(:import [java.util.concurrent ConcurrentLinkedQueue])
(:import [org.apache.storm Thrift])
(:import [org.mockito ArgumentCaptor Mockito ArgumentMatchers])
(:use [org.apache.storm config])
(:use [org.apache.storm clojure])
(:use [conjure core]))
(defbolt exclamation-bolt ["result" "return-info"] [tuple collector]
(emit-bolt! collector
[(str (.getString tuple 0) "!!!") (.getValue tuple 1)]
:anchor tuple)
(ack! collector tuple)
)
(deftest test-drpc-flow
(let [cluster (LocalCluster.)
drpc (LocalDRPC. (.getMetricRegistry cluster))
spout (DRPCSpout. "test" drpc)
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails spout)}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareShuffleGrouping)}
exclamation-bolt)
"3" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareGlobalGrouping)}
(ReturnResults.))})]
(.submitTopology cluster "test" {} topology)
(is (= "aaa!!!" (.execute drpc "test" "aaa")))
(is (= "b!!!" (.execute drpc "test" "b")))
(is (= "c!!!" (.execute drpc "test" "c")))
(.shutdown cluster)
(.shutdown drpc)
))
(defbolt exclamation-bolt-drpc ["id" "result"] [tuple collector]
(emit-bolt! collector
[(.getValue tuple 0) (str (.getString tuple 1) "!!!")]
:anchor tuple)
(ack! collector tuple)
)
(deftest test-drpc-builder
(let [cluster (LocalCluster.)
drpc (LocalDRPC. (.getMetricRegistry cluster))
builder (LinearDRPCTopologyBuilder. "test")
]
(.addBolt builder exclamation-bolt-drpc 3)
(.submitTopology cluster
"builder-test"
{}
(.createLocalTopology builder drpc))
(is (= "aaa!!!" (.execute drpc "test" "aaa")))
(is (= "b!!!" (.execute drpc "test" "b")))
(is (= "c!!!" (.execute drpc "test" "c")))
(.shutdown cluster)
(.shutdown drpc)
))
(defn safe-inc [v]
(if v (inc v) 1))
(defbolt partial-count ["request" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [id (.getValue tuple 0)]
(swap! counts update-in [id] safe-inc)
(ack! collector tuple)
))
CoordinatedBolt$FinishedCallback
(finishedId [this id]
(emit-bolt! collector [id (get @counts id 0)])
))
))
(defn safe+ [v1 v2]
(if v1 (+ v1 v2) v2))
(defbolt count-aggregator ["request" "total"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [id (.getValue tuple 0)
count (.getValue tuple 1)]
(swap! counts update-in [id] safe+ count)
(ack! collector tuple)
))
CoordinatedBolt$FinishedCallback
(finishedId [this id]
(emit-bolt! collector [id (get @counts id 0)])
))
))
(defbolt create-tuples ["request"] [tuple collector]
(let [id (.getValue tuple 0)
amt (Integer/parseInt (.getValue tuple 1))]
(doseq [i (range (* amt amt))]
(emit-bolt! collector [id] :anchor tuple))
(ack! collector tuple)
))
(deftest test-drpc-coordination
(let [cluster (LocalCluster.)
drpc (LocalDRPC. (.getMetricRegistry cluster))
builder (LinearDRPCTopologyBuilder. "square")
]
(.addBolt builder create-tuples 3)
(doto (.addBolt builder partial-count 3)
(.shuffleGrouping))
(doto (.addBolt builder count-aggregator 3)
(.fieldsGrouping (Fields. ["request"])))
(.submitTopology cluster
"squared"
{}
(.createLocalTopology builder drpc))
(is (= "4" (.execute drpc "square" "2")))
(is (= "100" (.execute drpc "square" "10")))
(is (= "1" (.execute drpc "square" "1")))
(is (= "0" (.execute drpc "square" "0")))
(.shutdown cluster)
(.shutdown drpc)
))
(defbolt id-bolt ["request" "val"] [tuple collector]
(emit-bolt! collector
(.getValues tuple)
:anchor tuple)
(ack! collector tuple))
(defbolt emit-finish ["request" "result"] {:prepare true}
[conf context collector]
(bolt
(execute [tuple]
(ack! collector tuple)
)
CoordinatedBolt$FinishedCallback
(finishedId [this id]
(emit-bolt! collector [id "done"])
)))
(deftest test-drpc-coordination-tricky
(let [cluster (LocalCluster.)
drpc (LocalDRPC. (.getMetricRegistry cluster))
builder (LinearDRPCTopologyBuilder. "tricky")
]
(.addBolt builder id-bolt 3)
(doto (.addBolt builder id-bolt 3)
(.shuffleGrouping))
(doto (.addBolt builder emit-finish 3)
(.fieldsGrouping (Fields. ["request"])))
(.submitTopology cluster
"tricky"
{}
(.createLocalTopology builder drpc))
(is (= "done" (.execute drpc "tricky" "2")))
(is (= "done" (.execute drpc "tricky" "3")))
(is (= "done" (.execute drpc "tricky" "4")))
(.shutdown cluster)
(.shutdown drpc)
))
(defbolt fail-finish-bolt ["request" "result"] {:prepare true}
[conf context collector]
(bolt
(execute [tuple]
(ack! collector tuple))
CoordinatedBolt$FinishedCallback
(finishedId [this id]
(throw (FailedException.))
)))
(deftest test-drpc-fail-finish
(let [cluster (LocalCluster.)
drpc (LocalDRPC. (.getMetricRegistry cluster))
builder (LinearDRPCTopologyBuilder. "fail2")
]
(.addBolt builder fail-finish-bolt 3)
(.submitTopology cluster
"fail2"
{}
(.createLocalTopology builder drpc))
(is (thrown? DRPCExecutionException (.execute drpc "fail2" "2")))
(.shutdown cluster)
(.shutdown drpc)
))
(deftest test-drpc-attempts-two-reconnects-in-fail-request
(let [handler (Mockito/mock DRPCInvocationsClient
(.extraInterfaces (Mockito/withSettings) (into-array Class [ILocalDRPC])))
;; used to capture the msgId on .emit
output-collector (Mockito/mock SpoutOutputCollector)
captor (ArgumentCaptor/forClass Object)
;; pick up service-id from registry
service-id (ServiceRegistry/registerService handler)]
;; mock getServiceId s.t. DRPCSpout uses our handler
(. (Mockito/when (.getServiceId handler)) thenReturn service-id)
;; mock fetchRequest s.t. DRPCSpout has a request to process on nextTuple
(. (Mockito/when (.fetchRequest handler (ArgumentMatchers/anyString))) thenReturn (DRPCRequest. "square 2" "bar"))
;; mock failRequest s.t. DRPCSpout attempts retry on .fail
(.failRequest (.when (Mockito/doThrow (into-array [(DRPCExecutionException.)])) handler) (ArgumentMatchers/anyString))
(let [spout (DRPCSpout. "test" handler)]
;; tell the spout to use the mock collector
(.open spout nil nil output-collector)
;; pick up a msgId to fail
(.nextTuple spout)
(.emit (Mockito/verify output-collector) (Mockito/anyList) (.capture captor))
;; fail the msg
(.fail spout (.getValue captor))
;; attempt 2 reconnects
(.reconnectClient (Mockito/verify handler (Mockito/times 2)))
(.failRequest (Mockito/verify handler (Mockito/times 3)) (ArgumentMatchers/anyString)))))
(deftest test-drpc-stops-retrying-after-successful-reconnect
(let [handler (Mockito/mock DRPCInvocationsClient
(.extraInterfaces (Mockito/withSettings) (into-array Class [ILocalDRPC])))
;; used to capture the msgId on .emit
output-collector (Mockito/mock SpoutOutputCollector)
captor (ArgumentCaptor/forClass Object)
;; pick up service-id from registry
service-id (ServiceRegistry/registerService handler)]
;; mock getServiceId s.t. DRPCSpout uses our handler
(. (Mockito/when (.getServiceId handler)) thenReturn service-id)
;; mock fetchRequest s.t. DRPCSpout has a request to process on nextTuple
(. (Mockito/when (.fetchRequest handler (ArgumentMatchers/anyString))) thenReturn (DRPCRequest. "square 2" "bar"))
;; mock failRequest s.t. DRPCSpout attempts retry on .fail the first time only, but succeed the second time
(.failRequest (.when (.doNothing
(Mockito/doThrow (into-array [(DRPCExecutionException.)]))) handler) (ArgumentMatchers/anyString))
(let [spout (DRPCSpout. "test" handler)]
;; tell the spout to use the mock collector
(.open spout nil nil output-collector)
;; pick up a msgId to fail
(.nextTuple spout)
(.emit (Mockito/verify output-collector) (Mockito/anyList) (.capture captor))
;; fail the msg
(.fail spout (.getValue captor))
;; reconnect once after a failure
(.reconnectClient (Mockito/verify handler (Mockito/times 1)))
(.failRequest (Mockito/verify handler (Mockito/times 2)) (ArgumentMatchers/anyString)))))