blob: 1e374ed248e4b870c42aa0634935c33d5dbffb6f [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.testing
(:import [org.apache.storm LocalCluster$Builder])
(:import [java.util.function UnaryOperator])
(:import [org.apache.storm.utils Time Time$SimulatedTime RegisteredGlobalState Utils])
(:import [org.apache.storm.testing InProcessZookeeper MkTupleParam TestJob MkClusterParam
TrackedTopology CompleteTopologyParam MockedSources])
(:import [org.apache.storm Thrift Testing Testing$Condition])
(:import [org.apache.storm.testing MockLeaderElector])
(:import [org.json.simple JSONValue])
(:use [org.apache.storm util config log])
(:use [org.apache.storm thrift]))
(defnk add-supervisor
[cluster-map :ports 2 :conf {} :id nil]
(let [local-cluster (:local-cluster cluster-map)]
(.addSupervisor local-cluster ports conf id)))
(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
(MockLeaderElector. is-leader leader-name leader-port))
(defn local-cluster-state [local-cluster]
{:nimbus (.getNimbus local-cluster)
:daemon-conf (.getDaemonConf local-cluster)
:storm-cluster-state (.getClusterState local-cluster)
:local-cluster local-cluster})
(defnk mk-mocked-nimbus
[:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil
:leader-elector nil :group-mapper nil :nimbus-daemon false :nimbus-wrapper nil]
(let [builder (doto (LocalCluster$Builder.)
(.withDaemonConf daemon-conf)
(.withINimbus inimbus)
(.withBlobStore blob-store)
(.withClusterState cluster-state)
(.withLeaderElector leader-elector)
(.withGroupMapper group-mapper)
(.withNimbusDaemon nimbus-daemon)
(.withNimbusWrapper (when nimbus-wrapper (reify UnaryOperator (apply [this nimbus] (nimbus-wrapper nimbus))))))
local-cluster (.build builder)]
(local-cluster-state local-cluster)))
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
(let [builder (doto (LocalCluster$Builder.)
(.withSupervisors supervisors)
(.withPortsPerSupervisor ports-per-supervisor)
(.withDaemonConf daemon-conf)
(.withINimbus inimbus)
(.withGroupMapper group-mapper)
(.withSupervisorSlotPortMin supervisor-slot-port-min)
(.withNimbusDaemon nimbus-daemon))
local-cluster (.build builder)]
(local-cluster-state local-cluster)))
(defn get-supervisor [cluster-map supervisor-id]
(let [local-cluster (:local-cluster cluster-map)]
(.getSupervisor local-cluster supervisor-id)))
(defn kill-supervisor [cluster-map supervisor-id]
(let [local-cluster (:local-cluster cluster-map)]
(.killSupervisor local-cluster supervisor-id)))
(defn kill-local-storm-cluster [cluster-map]
(let [local-cluster (:local-cluster cluster-map)]
(.close local-cluster)))
(defmacro while-timeout [timeout-ms condition & body]
`(Testing/whileTimeout ~timeout-ms
(reify Testing$Condition (exec [this] ~condition))
(fn [] ~@body)))
(defn wait-for-condition
(wait-for-condition Testing/TEST_TIMEOUT_MS apredicate))
([timeout-ms apredicate]
(while-timeout timeout-ms (not (apredicate))
(Time/sleep 100))))
(defn wait-until-cluster-waiting
"Wait until the cluster is idle. Should be used with time simulation."
(let [local-cluster (:local-cluster cluster-map)]
(.waitForIdle local-cluster)))
([cluster-map timeout-ms]
(let [local-cluster (:local-cluster cluster-map)]
(.waitForIdle local-cluster timeout-ms))))
(defn advance-cluster-time
([cluster-map secs increment-secs]
(let [local-cluster (:local-cluster cluster-map)]
(.advanceClusterTime local-cluster secs increment-secs)))
([cluster-map secs]
(let [local-cluster (:local-cluster cluster-map)]
(.advanceClusterTime local-cluster secs))))
(defmacro with-mocked-nimbus
[[nimbus-sym & args] & body]
`(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
(catch Throwable t#
(log-error t# "Error in cluster")
(throw t#))
(let [keep-waiting?# (atom true)
f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
(kill-local-storm-cluster ~nimbus-sym)
(reset! keep-waiting?# false)
(defmacro with-local-cluster
[[cluster-sym & args] & body]
`(let [~cluster-sym (mk-local-storm-cluster ~@args)]
(catch Throwable t#
(log-error t# "Error in cluster")
(throw t#))
(let [keep-waiting?# (atom true)
f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
(kill-local-storm-cluster ~cluster-sym)
(reset! keep-waiting?# false)
(defmacro with-simulated-time-local-cluster
[& args]
`(with-open [_# (Time$SimulatedTime.)]
(with-local-cluster ~@args)))
(defmacro with-inprocess-zookeeper
[port-sym & body]
`(with-open [zks# (InProcessZookeeper. )]
(let [~port-sym (.getPort zks#)]
(defn submit-local-topology
[nimbus storm-name conf topology]
(when-not (Utils/isValidConf conf)
(throw (IllegalArgumentException. "Topology conf is not json-serializable")))
(.submitTopology nimbus storm-name nil (JSONValue/toJSONString conf) topology))
(defn submit-local-topology-with-opts
[nimbus storm-name conf topology submit-opts]
(when-not (Utils/isValidConf conf)
(throw (IllegalArgumentException. "Topology conf is not json-serializable")))
(.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts))
(defn simulate-wait
(Testing/simulateWait (:local-cluster cluster-map)))
(defn spout-objects [spec-map]
(for [[_ spout-spec] spec-map]
(-> spout-spec
(defn capture-topology
(let [cap-topo (Testing/captureTopology topology)]
{:topology (.topology cap-topo)
:capturer (.capturer cap-topo)}))
(defnk complete-topology
[cluster-map topology
:mock-sources {}
:storm-conf {}
:cleanup-state true
:topology-name nil
:timeout-ms Testing/TEST_TIMEOUT_MS]
(Testing/completeTopology (:local-cluster cluster-map) topology,
(doto (CompleteTopologyParam.)
(.setStormConf storm-conf)
(.setTopologyName topology-name)
(.setTimeoutMs timeout-ms)
(.setMockedSources (MockedSources. mock-sources))
(.setCleanupState cleanup-state))))
(defn read-tuples
([results component-id stream-id]
(Testing/readTuples results component-id stream-id))
([results component-id]
(Testing/readTuples results component-id )))
(defn ms=
[a b]
(Testing/multiseteq a b))
(def TRACKER-BOLT-ID "+++tracker-bolt")
;; TODO: should override system-topology! and wrap everything there
(defn mk-tracked-topology
([tracked-cluster topology]
(let [tt (TrackedTopology. topology (:local-cluster tracked-cluster))]
{:topology (.getTopology tt)
:tracked-topo tt})))
(defn increment-global!
[id key amt]
(-> (RegisteredGlobalState/getState id)
(get key)
(.addAndGet amt)))
(defn global-amt
[id key]
(-> (RegisteredGlobalState/getState id)
(get key)
(defnk mkClusterParam
[:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :nimbus-daemon false]
;;TODO do we need to support inimbus?, group-mapper, or supervisor-slot-port-min
(doto (MkClusterParam. )
(.setDaemonConf daemon-conf)
(.setNimbusDaemon nimbus-daemon)
(.setPortsPerSupervisor (int ports-per-supervisor))
(.setSupervisors (int supervisors))))
(defmacro with-tracked-cluster
[[cluster-sym & cluster-args] & body]
(mkClusterParam ~@cluster-args)
(reify TestJob
(run [this# lc#]
(let [~cluster-sym (local-cluster-state lc#)]
(defn tracked-wait
"Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
(Testing/trackedWait (:tracked-topo tracked-map)))
([tracked-map amt]
(Testing/trackedWait (:tracked-topo tracked-map) (int amt)))
([tracked-map amt timeout-ms]
(Testing/trackedWait (:tracked-topo tracked-map) (int amt) (int timeout-ms))))
(defnk test-tuple
:component "component"
:fields nil]
(doto (MkTupleParam. )
(.setStream stream)
(.setComponent component)
(.setFieldsList fields))))
(defmacro with-timeout
[millis unit & body]
`(let [f# (future ~@body)]
(.get f# ~millis ~unit)
(finally (future-cancel f#)))))