blob: 36f3df6bb595cb3d98f6c89e88b9d01e8b734d52 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.testing4j-test
(:use [clojure.test])
(:use [backtype.storm config clojure testing util])
(:require [backtype.storm.integration-test :as it])
(:require [backtype.storm.thrift :as thrift])
(:import [backtype.storm Testing Config ILocalCluster])
(:import [backtype.storm.tuple Values Tuple])
(:import [backtype.storm.utils Time Utils])
(:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
AckFailMapTracker MkTupleParam]))
(deftest test-with-simulated-time
(is (= false (Time/isSimulating)))
(Testing/withSimulatedTime (fn []
(is (= true (Time/isSimulating)))))
(is (= false (Time/isSimulating))))
(deftest test-with-local-cluster
(let [mk-cluster-param (doto (MkClusterParam.)
(.setSupervisors (int 2))
(.setPortsPerSupervisor (int 5)))
daemon-conf (doto (Config.)
(.put SUPERVISOR-ENABLE false)
(.put TOPOLOGY-ACKER-EXECUTORS 0))]
(Testing/withLocalCluster mk-cluster-param (reify TestJob
(^void run [this ^ILocalCluster cluster]
(is (not (nil? cluster)))
(is (not (nil? (.getState cluster))))
(is (not (nil? (:nimbus (.getState cluster))))))))))
(deftest test-with-simulated-time-local-cluster
(let [mk-cluster-param (doto (MkClusterParam.)
(.setSupervisors (int 2)))
daemon-conf (doto (Config.)
(.put SUPERVISOR-ENABLE false)
(.put TOPOLOGY-ACKER-EXECUTORS 0))]
(is (not (Time/isSimulating)))
(Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
(^void run [this ^ILocalCluster cluster]
(is (not (nil? cluster)))
(is (not (nil? (.getState cluster))))
(is (not (nil? (:nimbus (.getState cluster)))))
(is (Time/isSimulating)))))
(is (not (Time/isSimulating)))))
(deftest test-complete-topology
(doseq [zmq-on? [true false]
:let [daemon-conf (doto (Config.)
(.put STORM-LOCAL-MODE-ZMQ zmq-on?))
mk-cluster-param (doto (MkClusterParam.)
(.setSupervisors (int 4))
(.setDaemonConf daemon-conf))]]
(Testing/withSimulatedTimeLocalCluster
(reify TestJob
(^void run [this ^ILocalCluster cluster]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
"3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
"4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
})
mocked-sources (doto (MockedSources.)
(.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
(Values. (into-array ["bob"]))
(Values. (into-array ["joey"]))
(Values. (into-array ["nathan"]))])
))
storm-conf (doto (Config.)
(.setNumWorkers 2))
complete-topology-param (doto (CompleteTopologyParam.)
(.setMockedSources mocked-sources)
(.setStormConf storm-conf))
results (Testing/completeTopology cluster
topology
complete-topology-param)]
(is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
(Testing/readTuples results "1")))
(is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
(read-tuples results "2")))
(is (= [[1] [2] [3] [4]]
(Testing/readTuples results "3")))
(is (= [[1] [2] [3] [4]]
(Testing/readTuples results "4")))
))))))
(deftest test-with-tracked-cluster
(Testing/withTrackedCluster
(reify TestJob
(^void run [this ^ILocalCluster cluster]
(let [[feeder checker] (it/ack-tracking-feeder ["num"])
tracked (Testing/mkTrackedTopology
cluster
(topology
{"1" (spout-spec feeder)}
{"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
"3" (bolt-spec {"1" :shuffle} it/identity-bolt)
"4" (bolt-spec
{"2" :shuffle
"3" :shuffle}
(it/agg-bolt 4))}))]
(.submitTopology cluster
"test-acking2"
(Config.)
(.getTopology tracked))
(.feed feeder [1])
(Testing/trackedWait tracked (int 1))
(checker 0)
(.feed feeder [1])
(Testing/trackedWait tracked (int 1))
(checker 2)
)))))
(deftest test-advance-cluster-time
(let [daemon-conf (doto (Config.)
(.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
mk-cluster-param (doto (MkClusterParam.)
(.setDaemonConf daemon-conf))]
(Testing/withSimulatedTimeLocalCluster
mk-cluster-param
(reify TestJob
(^void run [this ^ILocalCluster cluster]
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
{"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
storm-conf (doto (Config.)
(.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
(.submitTopology cluster
"timeout-tester"
storm-conf
topology)
(.feed feeder ["a"] 1)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(Testing/advanceClusterTime cluster (int 9))
(it/assert-acked tracker 1 3)
(is (not (.isFailed tracker 2)))
(Testing/advanceClusterTime cluster (int 12))
(it/assert-failed tracker 2)
))))))
(deftest test-disable-tuple-timeout
(let [daemon-conf (doto (Config.)
(.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
mk-cluster-param (doto (MkClusterParam.)
(.setDaemonConf daemon-conf))]
(Testing/withSimulatedTimeLocalCluster
mk-cluster-param
(reify TestJob
(^void run [this ^ILocalCluster cluster]
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
{"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
storm-conf (doto (Config.)
(.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
(.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
(.submitTopology cluster
"disable-timeout-tester"
storm-conf
topology)
(.feed feeder ["a"] 1)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(Testing/advanceClusterTime cluster (int 9))
(it/assert-acked tracker 1 3)
(is (not (.isFailed tracker 2)))
(Testing/advanceClusterTime cluster (int 12))
(is (not (.isFailed tracker 2)))
))))))
(deftest test-test-tuple
(letlocals
;; test the one-param signature
(bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
(is (= ["james" "bond"] (.getValues tuple)))
(is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
(is (= ["field1" "field2"] (-> tuple .getFields .toList)))
(is (= "component" (.getSourceComponent tuple)))
;; test the two-params signature
(bind mk-tuple-param (MkTupleParam.))
(doto mk-tuple-param
(.setStream "test-stream")
(.setComponent "test-component")
(.setFields (into-array String ["fname" "lname"])))
(bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
(is (= ["james" "bond"] (.getValues tuple)))
(is (= "test-stream" (.getSourceStreamId tuple)))
(is (= ["fname" "lname"] (-> tuple .getFields .toList)))
(is (= "test-component" (.getSourceComponent tuple)))))