blob: c579c59e0688e59beb20ba6a85c7ed5baea3100c [file] [log] [blame]
(ns backtype.storm.stats
(:import [backtype.storm.generated Nimbus Nimbus$Processor Nimbus$Iface StormTopology ShellComponent
NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
ClusterSummary TopologyInfo TopologySummary TaskSummary TaskStats TaskSpecificStats
SpoutStats BoltStats ErrorInfo SupervisorSummary])
(:use [backtype.storm util])
(:use [clojure.contrib.seq-utils :only [find-first]])
(:use [clojure.contrib.math :only [ceil]]))
;;TODO: consider replacing this with some sort of RRD
(defn curr-time-bucket [^Integer time-secs ^Integer bucket-size-secs]
(* bucket-size-secs (unchecked-divide time-secs bucket-size-secs))
)
(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])
(defn rolling-window [updater merger extractor bucket-size-secs num-buckets]
(RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))
(defn update-rolling-window
([^RollingWindow rw time-secs & args]
;; this is 2.5x faster than using update-in...
(let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
buckets (:buckets rw)
curr (get buckets time-bucket)
curr (apply (:updater rw) curr args)
]
(assoc rw :buckets (assoc buckets time-bucket curr))
)))
(defn value-rolling-window [^RollingWindow rw]
((:extractor rw)
(let [values (vals (:buckets rw))]
(apply (:merger rw) values)
)))
(defn cleanup-rolling-window [^RollingWindow rw]
(let [buckets (:buckets rw)
cutoff (- (current-time-secs)
(* (:num-buckets rw)
(:bucket-size-secs rw)))
to-remove (filter #(< % cutoff) (keys buckets))
buckets (apply dissoc buckets to-remove)]
(assoc rw :buckets buckets)
))
(defn rolling-window-size [^RollingWindow rw]
(* (:bucket-size-secs rw) (:num-buckets rw)))
(defrecord RollingWindowSet [updater extractor windows all-time])
(defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes]
(RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil)
)
(defn update-rolling-window-set
([^RollingWindowSet rws & args]
(let [now (current-time-secs)
new-windows (dofor [w (:windows rws)]
(apply update-rolling-window w now args))]
(assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args))
)))
(defn cleanup-rolling-window-set
([^RollingWindowSet rws]
(let [windows (:windows rws)]
(assoc rws :windows (map cleanup-rolling-window windows))
)))
(defn value-rolling-window-set [^RollingWindowSet rws]
(merge
(into {}
(for [w (:windows rws)]
{(rolling-window-size w) (value-rolling-window w)}
))
{:all-time ((:extractor rws) (:all-time rws))}))
(defn- incr-val
([amap key]
(incr-val amap key 1))
([amap key amt]
(let [val (get amap key (long 0))]
(assoc amap key (+ val amt))
)))
(defn- update-avg [curr val]
(if curr
[(+ (first curr) val) (inc (second curr))]
[val (long 1)]
))
(defn- merge-avg [& avg]
[(apply + (map first avg))
(apply + (map second avg))
])
(defn- extract-avg [pair]
(double (/ (first pair) (second pair))))
(defn- update-keyed-avg [amap key val]
(assoc amap key (update-avg (get amap key) val)))
(defn- merge-keyed-avg [& vals]
(apply merge-with merge-avg vals))
(defn- extract-keyed-avg [vals]
(map-val extract-avg vals))
(defn- counter-extract [v]
(if v v {}))
(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes]
(apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))
(defn avg-rolling-window-set [num-buckets & bucket-sizes]
(apply rolling-window-set update-avg merge-avg extract-avg num-buckets bucket-sizes)
)
(defn keyed-avg-rolling-window-set [num-buckets & bucket-sizes]
(apply rolling-window-set update-keyed-avg merge-keyed-avg extract-keyed-avg num-buckets bucket-sizes))
;; (defn choose-bucket [val buckets]
;; (let [ret (find-first #(<= val %) buckets)]
;; (if ret
;; ret
;; (* 10 (first buckets)))
;; ))
;; ;; buckets must be between 1 and 9
;; (defn to-proportional-bucket
;; "Maps to a bucket in the values order of magnitude. So if buckets are [1 2 5],
;; 3 -> 5
;; 7 -> 10
;; 1234 -> 2000
;; etc."
;; [val buckets]
;; (cond (= 0 val) 0
;; (between? val 1 9) (choose-bucket val buckets)
;; :else (* 10 (to-proportional-bucket (ceil (/ val 10))
;; buckets))))
(def COMMON-FIELDS [:emitted :transferred])
(defrecord CommonStats [emitted transferred rate])
(def BOLT-FIELDS [:acked :failed :process-latencies])
;;acked and failed count individual tuples
(defrecord BoltTaskStats [common acked failed process-latencies])
(def SPOUT-FIELDS [:acked :failed :complete-latencies])
;;acked and failed count tuple completion
(defrecord SpoutTaskStats [common acked failed complete-latencies])
(def NUM-STAT-BUCKETS 20)
;; 10 minutes, 3 hours, 1 day
(def STAT-BUCKETS [30 540 4320])
(defn- mk-common-stats [rate]
(CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
rate
))
(defn mk-bolt-stats [rate]
(BoltTaskStats. (mk-common-stats rate)
(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
))
(defn mk-spout-stats [rate]
(SpoutTaskStats. (mk-common-stats rate)
(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
))
(defmacro update-task-stat! [stats path & args]
(let [path (collectify path)]
`(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)
))
(defmacro stats-rate [stats]
`(-> ~stats :common :rate))
(defn emitted-tuple! [stats stream]
(update-task-stat! stats [:common :emitted] stream (stats-rate stats)))
(defn transferred-tuples! [stats stream amt]
(update-task-stat! stats [:common :transferred] stream (* (stats-rate stats) amt)))
(defn bolt-acked-tuple! [^BoltTaskStats stats component stream latency-ms]
(let [key [component stream]]
(update-task-stat! stats :acked key (stats-rate stats))
(update-task-stat! stats :process-latencies key latency-ms)
))
(defn bolt-failed-tuple! [^BoltTaskStats stats component stream latency-ms]
(let [key [component stream]]
(update-task-stat! stats :failed key (stats-rate stats))
))
(defn spout-acked-tuple! [^SpoutTaskStats stats stream latency-ms]
(update-task-stat! stats :acked stream (stats-rate stats))
(update-task-stat! stats :complete-latencies stream latency-ms)
)
(defn spout-failed-tuple! [^SpoutTaskStats stats stream latency-ms]
(update-task-stat! stats :failed stream (stats-rate stats))
)
(defn- cleanup-stat! [stat]
(swap! stat cleanup-rolling-window-set))
(defn- cleanup-common-stats! [^CommonStats stats]
(doseq [f COMMON-FIELDS]
(cleanup-stat! (f stats))
))
(defn cleanup-bolt-stats! [^BoltTaskStats stats]
(cleanup-common-stats! (:common stats))
(doseq [f BOLT-FIELDS]
(cleanup-stat! (f stats))
))
(defn cleanup-spout-stats! [^SpoutTaskStats stats]
(cleanup-common-stats! (:common stats))
(doseq [f SPOUT-FIELDS]
(cleanup-stat! (f stats))
))
(defn- value-stats [stats fields]
(into
{}
(dofor [f fields]
[f (value-rolling-window-set @(f stats))]
)))
(defn- value-common-stats [^CommonStats stats]
(merge
(value-stats stats COMMON-FIELDS)
{:rate (:rate stats)}))
(defn value-bolt-stats! [^BoltTaskStats stats]
(cleanup-bolt-stats! stats)
(merge (value-common-stats (:common stats))
(value-stats stats BOLT-FIELDS)
{:type :bolt}))
(defn value-spout-stats! [^SpoutTaskStats stats]
(cleanup-spout-stats! stats)
(merge (value-common-stats (:common stats))
(value-stats stats SPOUT-FIELDS)
{:type :spout}))
(defmulti render-stats! class-selector)
(defmethod render-stats! SpoutTaskStats [stats]
(value-spout-stats! stats))
(defmethod render-stats! BoltTaskStats [stats]
(value-bolt-stats! stats))
(defmulti thriftify-specific-stats :type)
(defn window-set-converter
([stats key-fn]
;; make the first key a string,
(into {}
(for [[k v] stats]
[(str k)
(into {}
(for [[k2 v2] v]
[(key-fn k2) v2]))]
)
))
([stats]
(window-set-converter stats identity)))
(defn to-global-stream-id [[component stream]]
(GlobalStreamId. component stream)
)
(defmethod thriftify-specific-stats :bolt
[stats]
(TaskSpecificStats/bolt
(BoltStats. (window-set-converter (:acked stats) to-global-stream-id)
(window-set-converter (:failed stats) to-global-stream-id)
(window-set-converter (:process-latencies stats) to-global-stream-id)))
)
(defmethod thriftify-specific-stats :spout
[stats]
(TaskSpecificStats/spout
(SpoutStats. (window-set-converter (:acked stats))
(window-set-converter (:failed stats))
(window-set-converter (:complete-latencies stats)))
))
(defn thriftify-task-stats [stats]
(let [specific-stats (thriftify-specific-stats stats)]
(TaskStats. (window-set-converter (:emitted stats))
(window-set-converter (:transferred stats))
specific-stats)
))