(ns backtype.storm.metrics-test
(:use [clojure test])
(:import [backtype.storm.topology TopologyBuilder])
(:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
TestAggregatesCounter TestConfBolt AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout])
(:import [backtype.storm.task ShellBolt])
(:import [backtype.storm.spout ShellSpout])
(:import [backtype.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo])
(:import [backtype.storm.metric.api.rpc CountShellMetric])
(:import [backtype.storm.utils Utils])
(:use [backtype.storm testing clojure config])
(:use [backtype.storm.daemon common])
(:use [backtype.storm.metric testing])
(:require [backtype.storm [thrift :as thrift]]))
(defbolt acking-bolt {} {:prepare true}
[conf context collector]
(execute [tuple]
(ack! collector tuple))))
(defbolt ack-every-other {} {:prepare true}
[conf context collector]
(let [state (atom -1)]
(execute [tuple]
(let [val (swap! state -)]
(when (pos? val)
(ack! collector tuple)
(defn assert-loop [afn ids]
(while (not (every? afn ids))
(Thread/sleep 1)))
(defn assert-acked [tracker & ids]
(assert-loop #(.isAcked tracker %) ids))
(defn assert-failed [tracker & ids]
(assert-loop #(.isFailed tracker %) ids))
(defbolt count-acks {} {:prepare true}
[conf context collector]
(let [mycustommetric (CountMetric.)]
(.registerMetric context "my-custom-metric" mycustommetric 5)
(execute [tuple]
(.incr mycustommetric)
(ack! collector tuple)))))
(def metrics-data backtype.storm.metric.testing/buffer)
(defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
(while-timeout TEST-TIMEOUT-MS
(let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))]
(and (not= N 0) (nil? taskid->buckets))
(not-every? #(<= N %) (map (comp count second) taskid->buckets))))
;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " (-> @metrics-data (get comp-id) (get metric-name)))
(if cluster
(advance-cluster-time cluster 1)
(Thread/sleep 10))))
(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
(-> @metrics-data
(get comp-id)
(get metric-name)
(first) ;; pick first task in the list, ignore other tasks' metric data.
(or [])))
(defmacro assert-buckets! [comp-id metric-name expected cluster]
(let [N# (count ~expected)]
(wait-for-atleast-N-buckets! N# ~comp-id ~metric-name ~cluster)
(is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
(defmacro assert-metric-data-exists! [comp-id metric-name]
`(is (not-empty (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name))))
(deftest test-custom-metric
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000
(let [feeder (feeder-spout ["field1"])
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
{"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})]
(submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
(assert-buckets! "2" "my-custom-metric" [1] cluster)
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0] cluster)
(advance-cluster-time cluster 20)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(deftest test-custom-metric-with-multi-tasks
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000
(let [feeder (feeder-spout ["field1"])
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
{"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
(submit-local-topology (:nimbus cluster) "metrics-tester-with-multitasks" {} topology)
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
(assert-buckets! "2" "my-custom-metric" [1] cluster)
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0] cluster)
(advance-cluster-time cluster 20)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(defn mk-shell-bolt-with-metrics-spec
[inputs command & kwargs]
(let [command (into-array String command)]
(apply thrift/mk-bolt-spec inputs
(PythonShellMetricsBolt. command) kwargs)))
(deftest test-custom-metric-with-multilang-py
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000
(let [feeder (feeder-spout ["field1"])
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
{"2" (mk-shell-bolt-with-metrics-spec {"1" :global} ["python" ""])})]
(submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
(assert-buckets! "2" "my-custom-shell-metric" [1] cluster)
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-shell-metric" [1 0] cluster)
(advance-cluster-time cluster 20)
(assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2] cluster)
(defn mk-shell-spout-with-metrics-spec
[command & kwargs]
(let [command (into-array String command)]
(apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
(deftest test-custom-metric-with-spout-multilang-py
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000}]
(let [topology (thrift/mk-topology
{"1" (mk-shell-spout-with-metrics-spec ["python" ""])}
{"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
(submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
(advance-cluster-time cluster 7)
(assert-buckets! "1" "my-custom-shellspout-metric" [2] cluster)
(deftest test-builtin-metrics-1
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
(let [feeder (feeder-spout ["field1"])
topology (thrift/mk-topology
{"myspout" (thrift/mk-spout-spec feeder)}
{"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} acking-bolt)})]
(submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 61)
(assert-buckets! "myspout" "__ack-count/default" [1] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
(advance-cluster-time cluster 120)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 0 0] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 0 0] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0] cluster)
(.feed feeder ["b"] 1)
(.feed feeder ["c"] 1)
(advance-cluster-time cluster 60)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0 2] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 0 0 2] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2] cluster))))
(deftest test-builtin-metrics-2
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (thrift/mk-topology
{"myspout" (thrift/mk-spout-spec feeder)}
{"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})]
(submit-local-topology (:nimbus cluster)
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
(assert-acked tracker 1)
(assert-buckets! "myspout" "__fail-count/default" [] cluster)
(assert-buckets! "myspout" "__ack-count/default" [1] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
(.feed feeder ["b"] 2)
(advance-cluster-time cluster 5)
(assert-buckets! "myspout" "__fail-count/default" [] cluster)
(assert-buckets! "myspout" "__ack-count/default" [1 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 1] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 1] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1] cluster)
(advance-cluster-time cluster 15)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0] cluster)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 15)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0] cluster))))
(deftest test-builtin-metrics-3
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
(let [feeder (feeder-spout ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (thrift/mk-topology
{"myspout" (thrift/mk-spout-spec feeder)}
{"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})]
(submit-local-topology (:nimbus cluster)
(.feed feeder ["a"] 1)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 9)
(assert-acked tracker 1 3)
(assert-buckets! "myspout" "__ack-count/default" [2] cluster)
(assert-buckets! "myspout" "__emit-count/default" [3] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
(is (not (.isFailed tracker 2)))
(advance-cluster-time cluster 30)
(assert-failed tracker 2)
(assert-buckets! "myspout" "__fail-count/default" [1] cluster)
(assert-buckets! "myspout" "__ack-count/default" [2 0 0 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [3 0 0 0] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0] cluster))))
(deftest test-system-bolt
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
(let [feeder (feeder-spout ["field1"])
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
(submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 70)
(assert-buckets! "__system" "newWorkerEvent" [1] cluster)
(assert-metric-data-exists! "__system" "uptimeSecs")
(assert-metric-data-exists! "__system" "startTimeSecs")
(advance-cluster-time cluster 180)
(assert-buckets! "__system" "newWorkerEvent" [1 0 0 0] cluster)