;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements.  See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership.  The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License.  You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns backtype.storm.stats
  (:import [backtype.storm.generated Nimbus Nimbus$Processor Nimbus$Iface StormTopology ShellComponent
            NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
            ClusterSummary TopologyInfo TopologySummary ExecutorInfo ExecutorSummary ExecutorStats
            ExecutorSpecificStats SpoutStats BoltStats ErrorInfo
            SupervisorSummary CommonAggregateStats ComponentAggregateStats
            ComponentPageInfo ComponentType BoltAggregateStats
            ExecutorAggregateStats SpecificAggregateStats
            SpoutAggregateStats TopologyPageInfo TopologyStats])
  (:import [backtype.storm.utils Utils])
  (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
  (:use [backtype.storm log util])
  (:use [clojure.math.numeric-tower :only [ceil]]))

(def TEN-MIN-IN-SECONDS (* 10 60))

(def COMMON-FIELDS [:emitted :transferred])
(defrecord CommonStats [^MultiCountStatAndMetric emitted
                        ^MultiCountStatAndMetric transferred
                        rate])

(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
;;acked and failed count individual tuples
(defrecord BoltExecutorStats [^CommonStats common
                              ^MultiCountStatAndMetric acked
                              ^MultiCountStatAndMetric failed
                              ^MultiLatencyStatAndMetric process-latencies
                              ^MultiCountStatAndMetric executed
                              ^MultiLatencyStatAndMetric execute-latencies])

(def SPOUT-FIELDS [:acked :failed :complete-latencies])
;;acked and failed count tuple completion
(defrecord SpoutExecutorStats [^CommonStats common
                               ^MultiCountStatAndMetric acked
                               ^MultiCountStatAndMetric failed
                               ^MultiLatencyStatAndMetric complete-latencies])

(def NUM-STAT-BUCKETS 20)

(defn- mk-common-stats
  [rate]
  (CommonStats.
    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
    rate))

(defn mk-bolt-stats
  [rate]
  (BoltExecutorStats.
    (mk-common-stats rate)
    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)
    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))

(defn mk-spout-stats
  [rate]
  (SpoutExecutorStats.
    (mk-common-stats rate)
    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))

(defmacro stats-rate
  [stats]
  `(-> ~stats :common :rate))

(defmacro stats-emitted
  [stats]
  `(-> ~stats :common :emitted))

(defmacro stats-transferred
  [stats]
  `(-> ~stats :common :transferred))

(defmacro stats-executed
  [stats]
  `(:executed ~stats))

(defmacro stats-acked
  [stats]
  `(:acked ~stats))

(defmacro stats-failed
  [stats]
  `(:failed ~stats))

(defmacro stats-execute-latencies
  [stats]
  `(:execute-latencies ~stats))

(defmacro stats-process-latencies
  [stats]
  `(:process-latencies ~stats))

(defmacro stats-complete-latencies
  [stats]
  `(:complete-latencies ~stats))

(defn emitted-tuple!
  [stats stream]
  (.incBy ^MultiCountStatAndMetric (stats-emitted stats) ^Object stream ^long (stats-rate stats)))

(defn transferred-tuples!
  [stats stream amt]
  (.incBy ^MultiCountStatAndMetric (stats-transferred stats) ^Object stream ^long (* (stats-rate stats) amt)))

(defn bolt-execute-tuple!
  [^BoltExecutorStats stats component stream latency-ms]
  (let [key [component stream]
        ^MultiCountStatAndMetric executed (stats-executed stats)
        ^MultiLatencyStatAndMetric exec-lat (stats-execute-latencies stats)]
    (.incBy executed key (stats-rate stats))
    (.record exec-lat key latency-ms)))

(defn bolt-acked-tuple!
  [^BoltExecutorStats stats component stream latency-ms]
  (let [key [component stream]
        ^MultiCountStatAndMetric acked (stats-acked stats)
        ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)]
    (.incBy acked key (stats-rate stats))
    (.record process-lat key latency-ms)))

(defn bolt-failed-tuple!
  [^BoltExecutorStats stats component stream latency-ms]
  (let [key [component stream]
        ^MultiCountStatAndMetric failed (stats-failed stats)]
    (.incBy failed key (stats-rate stats))))

(defn spout-acked-tuple!
  [^SpoutExecutorStats stats stream latency-ms]
  (.incBy ^MultiCountStatAndMetric (stats-acked stats) stream (stats-rate stats))
  (.record ^MultiLatencyStatAndMetric (stats-complete-latencies stats) stream latency-ms))

(defn spout-failed-tuple!
  [^SpoutExecutorStats stats stream latency-ms]
  (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats)))

(defn- cleanup-stat! [stat]
  (.close stat))

(defn- cleanup-common-stats!
  [^CommonStats stats]
  (doseq [f COMMON-FIELDS]
    (cleanup-stat! (f stats))))

(defn cleanup-bolt-stats!
  [^BoltExecutorStats stats]
  (cleanup-common-stats! (:common stats))
  (doseq [f BOLT-FIELDS]
    (cleanup-stat! (f stats))))

(defn cleanup-spout-stats!
  [^SpoutExecutorStats stats]
  (cleanup-common-stats! (:common stats))
  (doseq [f SPOUT-FIELDS]
    (cleanup-stat! (f stats))))

(defn- value-stats
  [stats fields]
  (into {} (dofor [f fields]
                  [f (if (instance? MultiCountStatAndMetric (f stats))
                         (.getTimeCounts ^MultiCountStatAndMetric (f stats))
                         (.getTimeLatAvg ^MultiLatencyStatAndMetric (f stats)))])))

(defn- value-common-stats
  [^CommonStats stats]
  (merge
    (value-stats stats COMMON-FIELDS)
    {:rate (:rate stats)}))

(defn value-bolt-stats!
  [^BoltExecutorStats stats]
  (cleanup-bolt-stats! stats)
  (merge (value-common-stats (:common stats))
         (value-stats stats BOLT-FIELDS)
         {:type :bolt}))

(defn value-spout-stats!
  [^SpoutExecutorStats stats]
  (cleanup-spout-stats! stats)
  (merge (value-common-stats (:common stats))
         (value-stats stats SPOUT-FIELDS)
         {:type :spout}))

(defmulti render-stats! class-selector)

(defmethod render-stats! SpoutExecutorStats
  [stats]
  (value-spout-stats! stats))

(defmethod render-stats! BoltExecutorStats
  [stats]
  (value-bolt-stats! stats))

(defmulti thriftify-specific-stats :type)
(defmulti clojurify-specific-stats class-selector)

(defn window-set-converter
  ([stats key-fn first-key-fun]
    (into {}
      (for [[k v] stats]
        ;apply the first-key-fun only to first key.
        [(first-key-fun k)
         (into {} (for [[k2 v2] v]
                    [(key-fn k2) v2]))])))
  ([stats first-key-fun]
    (window-set-converter stats identity first-key-fun)))

(defn to-global-stream-id
  [[component stream]]
  (GlobalStreamId. component stream))

(defn from-global-stream-id [global-stream-id]
  [(.get_componentId global-stream-id) (.get_streamId global-stream-id)])

(defmethod clojurify-specific-stats BoltStats [^BoltStats stats]
  [(window-set-converter (.get_acked stats) from-global-stream-id identity)
   (window-set-converter (.get_failed stats) from-global-stream-id identity)
   (window-set-converter (.get_process_ms_avg stats) from-global-stream-id identity)
   (window-set-converter (.get_executed stats) from-global-stream-id identity)
   (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id identity)])

(defmethod clojurify-specific-stats SpoutStats [^SpoutStats stats]
  [(.get_acked stats)
   (.get_failed stats)
   (.get_complete_ms_avg stats)])


(defn clojurify-executor-stats
  [^ExecutorStats stats]
  (let [ specific-stats (.get_specific stats)
         is_bolt? (.is_set_bolt specific-stats)
         specific-stats (if is_bolt? (.get_bolt specific-stats) (.get_spout specific-stats))
         specific-stats (clojurify-specific-stats specific-stats)
         common-stats (CommonStats. (.get_emitted stats)
                                    (.get_transferred stats)
                                    (.get_rate stats))]
    (if is_bolt?
      ; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats!
      ; which flattens the BoltExecutorStats/SpoutExecutorStats by extracting values from all atoms and merging all values inside :common to top
      ;level map we are pretty much doing the same here.
      (dissoc (merge common-stats {:type :bolt}  (apply ->BoltExecutorStats (into [nil] specific-stats))) :common)
      (dissoc (merge common-stats {:type :spout} (apply ->SpoutExecutorStats (into [nil] specific-stats))) :common)
      )))

(defmethod thriftify-specific-stats :bolt
  [stats]
  (ExecutorSpecificStats/bolt
    (BoltStats.
      (window-set-converter (:acked stats) to-global-stream-id str)
      (window-set-converter (:failed stats) to-global-stream-id str)
      (window-set-converter (:process-latencies stats) to-global-stream-id str)
      (window-set-converter (:executed stats) to-global-stream-id str)
      (window-set-converter (:execute-latencies stats) to-global-stream-id str))))

(defmethod thriftify-specific-stats :spout
  [stats]
  (ExecutorSpecificStats/spout
    (SpoutStats. (window-set-converter (:acked stats) str)
      (window-set-converter (:failed stats) str)
      (window-set-converter (:complete-latencies stats) str))))

(defn thriftify-executor-stats
  [stats]
  (let [specific-stats (thriftify-specific-stats stats)
        rate (:rate stats)]
    (ExecutorStats. (window-set-converter (:emitted stats) str)
      (window-set-converter (:transferred stats) str)
      specific-stats
      rate)))

(defn valid-number?
  "Returns true if x is a number that is not NaN or Infinity, false otherwise"
  [x]
  (and (number? x)
       (not (Double/isNaN x))
       (not (Double/isInfinite x))))

(defn apply-default
  [f defaulting-fn & args]
  (apply f (map defaulting-fn args)))

(defn apply-or-0
  [f & args]
  (apply apply-default
         f
         #(if (valid-number? %) % 0)
         args))

(defn sum-or-0
  [& args]
  (apply apply-or-0 + args))

(defn product-or-0
  [& args]
  (apply apply-or-0 * args))

(defn max-or-0
  [& args]
  (apply apply-or-0 max args))

(defn- agg-bolt-lat-and-count
  "Aggregates number executed, process latency, and execute latency across all
  streams."
  [idk->exec-avg idk->proc-avg idk->num-executed]
  (letfn [(weight-avg [[id avg]]
            (let [num-e (get idk->num-executed id)]
              (product-or-0 avg num-e)))]
    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
     :executed (sum (vals idk->num-executed))}))

(defn- agg-spout-lat-and-count
  "Aggregates number acked and complete latencies across all streams."
  [sid->comp-avg sid->num-acked]
  (letfn [(weight-avg [[id avg]]
            (product-or-0 avg (get sid->num-acked id)))]
    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
     :acked (sum (vals sid->num-acked))}))

(defn add-pairs
  ([] [0 0])
  ([[a1 a2] [b1 b2]]
   [(+ a1 b1) (+ a2 b2)]))

(defn mk-include-sys-fn
  [include-sys?]
  (if include-sys?
    (fn [_] true)
    (fn [stream] (and (string? stream) (not (Utils/isSystemId stream))))))

(defn mk-include-sys-filter
  "Returns a function that includes or excludes map entries whose keys are
  system ids."
  [include-sys?]
  (if include-sys?
    identity
    (partial filter-key (mk-include-sys-fn false))))

(defn- agg-bolt-streams-lat-and-count
  "Aggregates number executed and process & execute latencies."
  [idk->exec-avg idk->proc-avg idk->executed]
  (letfn [(weight-avg [id avg]
            (let [num-e (idk->executed id)]
              (product-or-0 avg num-e)))]
    (into {}
      (for [k (keys idk->exec-avg)]
        [k {:executeLatencyTotal (weight-avg k (get idk->exec-avg k))
            :processLatencyTotal (weight-avg k (get idk->proc-avg k))
            :executed (idk->executed k)}]))))

(defn- agg-spout-streams-lat-and-count
  "Aggregates number acked and complete latencies."
  [idk->comp-avg idk->acked]
  (letfn [(weight-avg [id avg]
            (let [num-e (get idk->acked id)]
              (product-or-0 avg num-e)))]
    (into {}
      (for [k (keys idk->comp-avg)]
        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
            :acked (get idk->acked k)}]))))

(defn swap-map-order
  "For a nested map, rearrange data such that the top-level keys become the
  nested map's keys and vice versa.
  Example:
  {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}}
  -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}"
  [m]
  (apply merge-with
         merge
         (map (fn [[k v]]
                (into {}
                      (for [[k2 v2] v]
                        [k2 {k v2}])))
              m)))

(defn- compute-agg-capacity
  "Computes the capacity metric for one executor given its heartbeat data and
  uptime."
  [m uptime]
  (when uptime
    (->>
      ;; For each stream, create weighted averages and counts.
      (merge-with (fn weighted-avg+count-fn
                    [avg cnt]
                    [(* avg cnt) cnt])
                  (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS))
                  (get (:executed m) (str TEN-MIN-IN-SECONDS)))
      vals ;; Ignore the stream ids.
      (reduce add-pairs
              [0. 0]) ;; Combine weighted averages and counts.
      ((fn [[weighted-avg cnt]]
        (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS))))))))

(defn agg-pre-merge-comp-page-bolt
  [{exec-id :exec-id
    host :host
    port :port
    uptime :uptime
    comp-id :comp-id
    num-tasks :num-tasks
    statk->w->sid->num :stats}
   window
   include-sys?]
  (let [str-key (partial map-key str)
        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    {:executor-id exec-id,
     :host host,
     :port port,
     :uptime uptime,
     :num-executors 1,
     :num-tasks num-tasks,
     :capacity (compute-agg-capacity statk->w->sid->num uptime)
     :cid+sid->input-stats
     (merge-with
       merge
       (swap-map-order
         {:acked (-> statk->w->sid->num
                     :acked
                     str-key
                     (get window))
          :failed (-> statk->w->sid->num
                      :failed
                      str-key
                      (get window))})
       (agg-bolt-streams-lat-and-count (-> statk->w->sid->num
                                           :execute-latencies
                                           str-key
                                           (get window))
                                       (-> statk->w->sid->num
                                           :process-latencies
                                           str-key
                                           (get window))
                                       (-> statk->w->sid->num
                                           :executed
                                           str-key
                                           (get window)))),
     :sid->output-stats
     (swap-map-order
       {:emitted (-> statk->w->sid->num
                     :emitted
                     str-key
                     (get window)
                     handle-sys-components-fn)
        :transferred (-> statk->w->sid->num
                         :transferred
                         str-key
                         (get window)
                         handle-sys-components-fn)})}))

(defn agg-pre-merge-comp-page-spout
  [{exec-id :exec-id
    host :host
    port :port
    uptime :uptime
    comp-id :comp-id
    num-tasks :num-tasks
    statk->w->sid->num :stats}
   window
   include-sys?]
  (let [str-key (partial map-key str)
        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    {:executor-id exec-id,
     :host host,
     :port port,
     :uptime uptime,
     :num-executors 1,
     :num-tasks num-tasks,
     :sid->output-stats
     (merge-with
       merge
       (agg-spout-streams-lat-and-count (-> statk->w->sid->num
                                            :complete-latencies
                                            str-key
                                            (get window))
                                        (-> statk->w->sid->num
                                            :acked
                                            str-key
                                            (get window)))
       (swap-map-order
         {:acked (-> statk->w->sid->num
                     :acked
                     str-key
                     (get window))
          :failed (-> statk->w->sid->num
                      :failed
                      str-key
                      (get window))
          :emitted (-> statk->w->sid->num
                       :emitted
                       str-key
                       (get window)
                       handle-sys-components-fn)
          :transferred (-> statk->w->sid->num
                           :transferred
                           str-key
                           (get window)
                           handle-sys-components-fn)}))}))

(defn agg-pre-merge-topo-page-bolt
  [{comp-id :comp-id
    num-tasks :num-tasks
    statk->w->sid->num :stats
    uptime :uptime}
   window
   include-sys?]
  (let [str-key (partial map-key str)
        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    {comp-id
     (merge
       (agg-bolt-lat-and-count (-> statk->w->sid->num
                                   :execute-latencies
                                   str-key
                                   (get window))
                               (-> statk->w->sid->num
                                   :process-latencies
                                   str-key
                                   (get window))
                               (-> statk->w->sid->num
                                   :executed
                                   str-key
                                   (get window)))
       {:num-executors 1
        :num-tasks num-tasks
        :emitted (-> statk->w->sid->num
                     :emitted
                     str-key
                     (get window)
                     handle-sys-components-fn
                     vals
                     sum)
        :transferred (-> statk->w->sid->num
                         :transferred
                         str-key
                         (get window)
                         handle-sys-components-fn
                         vals
                         sum)
        :capacity (compute-agg-capacity statk->w->sid->num uptime)
        :acked (-> statk->w->sid->num
                   :acked
                   str-key
                   (get window)
                   vals
                   sum)
        :failed (-> statk->w->sid->num
                    :failed
                    str-key
                    (get window)
                    vals
                    sum)})}))

(defn agg-pre-merge-topo-page-spout
  [{comp-id :comp-id
    num-tasks :num-tasks
    statk->w->sid->num :stats}
   window
   include-sys?]
  (let [str-key (partial map-key str)
        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    {comp-id
     (merge
       (agg-spout-lat-and-count (-> statk->w->sid->num
                                    :complete-latencies
                                    str-key
                                    (get window))
                                (-> statk->w->sid->num
                                    :acked
                                    str-key
                                    (get window)))
       {:num-executors 1
        :num-tasks num-tasks
        :emitted (-> statk->w->sid->num
                     :emitted
                     str-key
                     (get window)
                     handle-sys-components-fn
                     vals
                     sum)
        :transferred (-> statk->w->sid->num
                         :transferred
                         str-key
                         (get window)
                         handle-sys-components-fn
                         vals
                         sum)
        :failed (-> statk->w->sid->num
                    :failed
                    str-key
                    (get window)
                    vals
                    sum)})}))

(defn merge-agg-comp-stats-comp-page-bolt
  [{acc-in :cid+sid->input-stats
    acc-out :sid->output-stats
    :as acc-bolt-stats}
   {bolt-in :cid+sid->input-stats
    bolt-out :sid->output-stats
    :as bolt-stats}]
  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)),
   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats)),
   :sid->output-stats (merge-with (partial merge-with sum-or-0)
                                  acc-out
                                  bolt-out),
   :cid+sid->input-stats (merge-with (partial merge-with sum-or-0)
                                     acc-in
                                     bolt-in),
   :executor-stats
   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
         executed (sum-streams bolt-in :executed)]
     (conj (:executor-stats acc-bolt-stats)
           (merge
             (select-keys bolt-stats
                          [:executor-id :uptime :host :port :capacity])
             {:emitted (sum-streams bolt-out :emitted)
              :transferred (sum-streams bolt-out :transferred)
              :acked (sum-streams bolt-in :acked)
              :failed (sum-streams bolt-in :failed)
              :executed executed}
             (->>
               (if (and executed (pos? executed))
                 [(div (sum-streams bolt-in :executeLatencyTotal) executed)
                  (div (sum-streams bolt-in :processLatencyTotal) executed)]
                 [nil nil])
               (mapcat vector [:execute-latency :process-latency])
               (apply assoc {})))))})

(defn merge-agg-comp-stats-comp-page-spout
  [{acc-out :sid->output-stats
    :as acc-spout-stats}
   {spout-out :sid->output-stats
    :as spout-stats}]
  {:num-executors (inc (or (:num-executors acc-spout-stats) 0)),
   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats)),
   :sid->output-stats (merge-with (partial merge-with sum-or-0)
                                  acc-out
                                  spout-out),
   :executor-stats
   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
         acked (sum-streams spout-out :acked)]
     (conj (:executor-stats acc-spout-stats)
           (merge
             (select-keys spout-stats [:executor-id :uptime :host :port])
             {:emitted (sum-streams spout-out :emitted)
              :transferred (sum-streams spout-out :transferred)
              :acked acked
              :failed (sum-streams spout-out :failed)}
             {:complete-latency (if (and acked (pos? acked))
                                  (div (sum-streams spout-out
                                                    :completeLatencyTotal)
                                       acked)
                                  nil)})))})

(defn merge-agg-comp-stats-topo-page-bolt
  [acc-bolt-stats bolt-stats]
  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats))
   :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
   :transferred (sum-or-0 (:transferred acc-bolt-stats)
                          (:transferred bolt-stats))
   :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
   ;; We sum average latency totals here to avoid dividing at each step.
   ;; Compute the average latencies by dividing the total by the count.
   :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats)
                                  (:executeLatencyTotal bolt-stats))
   :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats)
                                  (:processLatencyTotal bolt-stats))
   :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats))
   :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
   :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})

(defn merge-agg-comp-stats-topo-page-spout
  [acc-spout-stats spout-stats]
  {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats))
   :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
   :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred spout-stats))
   ;; We sum average latency totals here to avoid dividing at each step.
   ;; Compute the average latencies by dividing the total by the count.
   :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
                            (:completeLatencyTotal spout-stats))
   :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats))
   :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))})

(defn aggregate-count-streams
  [stats]
  (->> stats
       (map-val #(reduce + (vals %)))))

(defn- agg-topo-exec-stats*
  "A helper function that does the common work to aggregate stats of one
  executor with the given map for the topology page."
  [window
   include-sys?
   {:keys [workers-set
           bolt-id->stats
           spout-id->stats
           window->emitted
           window->transferred
           window->comp-lat-wgt-avg
           window->acked
           window->failed] :as acc-stats}
   {:keys [stats] :as new-data}
   pre-merge-fn
   merge-fn
   comp-key]
  (let [cid->statk->num (pre-merge-fn new-data window include-sys?)
        {w->compLatWgtAvg :completeLatencyTotal
         w->acked :acked}
          (if (:complete-latencies stats)
            (swap-map-order
              (into {}
                    (for [w (keys (:acked stats))]
                         [w (agg-spout-lat-and-count
                              (get (:complete-latencies stats) w)
                              (get (:acked stats) w))])))
            {:completeLatencyTotal nil
             :acks (aggregate-count-streams (:acked stats))})
        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    (assoc {:workers-set (conj workers-set
                               [(:host new-data) (:port new-data)])
            :bolt-id->stats bolt-id->stats
            :spout-id->stats spout-id->stats
            :window->emitted (->> (:emitted stats)
                                  (map-val handle-sys-components-fn)
                                  aggregate-count-streams
                                  (merge-with + window->emitted))
            :window->transferred (->> (:transferred stats)
                                      (map-val handle-sys-components-fn)
                                      aggregate-count-streams
                                      (merge-with + window->transferred))
            :window->comp-lat-wgt-avg (merge-with +
                                                  window->comp-lat-wgt-avg
                                                  w->compLatWgtAvg)
            :window->acked (if (= :spout (:type stats))
                             (merge-with + window->acked w->acked)
                             window->acked)
            :window->failed (if (= :spout (:type stats))
                              (->> (:failed stats)
                                   aggregate-count-streams
                                   (merge-with + window->failed))
                              window->failed)}
           comp-key (merge-with merge-fn
                                (acc-stats comp-key)
                                cid->statk->num)
           :type (:type stats))))

(defmulti agg-topo-exec-stats
  "Combines the aggregate stats of one executor with the given map, selecting
  the appropriate window and including system components as specified."
  (fn dispatch-fn [& args] (:type (last args))))

(defmethod agg-topo-exec-stats :bolt
  [window include-sys? acc-stats new-data]
  (agg-topo-exec-stats* window
                        include-sys?
                        acc-stats
                        new-data
                        agg-pre-merge-topo-page-bolt
                        merge-agg-comp-stats-topo-page-bolt
                        :bolt-id->stats))

(defmethod agg-topo-exec-stats :spout
  [window include-sys? acc-stats new-data]
  (agg-topo-exec-stats* window
                        include-sys?
                        acc-stats
                        new-data
                        agg-pre-merge-topo-page-spout
                        merge-agg-comp-stats-topo-page-spout
                        :spout-id->stats))

(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)

(defn get-last-error
  [storm-cluster-state storm-id component-id]
  (if-let [e (.last-error storm-cluster-state storm-id component-id)]
    (ErrorInfo. (:error e) (:time-secs e))))

(defn component-type
  "Returns the component type (either :bolt or :spout) for a given
  topology and component id. Returns nil if not found."
  [^StormTopology topology id]
  (let [bolts (.get_bolts topology)
        spouts (.get_spouts topology)]
    (cond
      (Utils/isSystemId id) :bolt
      (.containsKey bolts id) :bolt
      (.containsKey spouts id) :spout)))

(defn extract-nodeinfos-from-hb-for-comp
  ([exec->host+port task->component include-sys? comp-id]
   (distinct (for [[[start end :as executor] [host port]] exec->host+port
         :let [id (task->component start)]
         :when (and (or (nil? comp-id) (= comp-id id))
                 (or include-sys? (not (Utils/isSystemId id))))]
     {:host host
      :port port}))))

(defn extract-data-from-hb
  ([exec->host+port task->component beats include-sys? topology comp-id]
   (for [[[start end :as executor] [host port]] exec->host+port
         :let [beat (beats executor)
               id (task->component start)]
         :when (and (or (nil? comp-id) (= comp-id id))
                    (or include-sys? (not (Utils/isSystemId id))))]
     {:exec-id executor
      :comp-id id
      :num-tasks (count (range start (inc end)))
      :host host
      :port port
      :uptime (:uptime beat)
      :stats (:stats beat)
      :type (or (:type (:stats beat))
                (component-type topology id))}))
  ([exec->host+port task->component beats include-sys? topology]
    (extract-data-from-hb exec->host+port
                          task->component
                          beats
                          include-sys?
                          topology
                          nil)))

(defn aggregate-topo-stats
  [window include-sys? data]
  (let [init-val {:workers-set #{}
                  :bolt-id->stats {}
                  :spout-id->stats {}
                  :window->emitted {}
                  :window->transferred {}
                  :window->comp-lat-wgt-avg {}
                  :window->acked {}
                  :window->failed {}}
        reducer-fn (partial agg-topo-exec-stats
                            window
                            include-sys?)]
    (reduce reducer-fn init-val data)))

(defn- compute-weighted-averages-per-window
  [acc-data wgt-avg-key divisor-key]
  (into {} (for [[window wgt-avg] (wgt-avg-key acc-data)
                 :let [divisor ((divisor-key acc-data) window)]
                 :when (and divisor (pos? divisor))]
             [(str window) (div wgt-avg divisor)])))

(defn- post-aggregate-topo-stats
  [task->component exec->node+port last-err-fn acc-data]
  {:num-tasks (count task->component)
   :num-workers (count (:workers-set acc-data))
   :num-executors (count exec->node+port)
   :bolt-id->stats
     (into {} (for [[id m] (:bolt-id->stats acc-data)
                    :let [executed (:executed m)]]
                     [id (-> m
                             (assoc :execute-latency
                                    (if (and executed (pos? executed))
                                      (div (or (:executeLatencyTotal m) 0)
                                           executed)
                                      0)
                                    :process-latency
                                    (if (and executed (pos? executed))
                                      (div (or (:processLatencyTotal m) 0)
                                           executed)
                                      0))
                             (dissoc :executeLatencyTotal
                                     :processLatencyTotal)
                             (assoc :lastError (last-err-fn id)))]))
   :spout-id->stats
     (into {} (for [[id m] (:spout-id->stats acc-data)
                    :let [acked (:acked m)]]
                    [id (-> m
                            (assoc :complete-latency
                                   (if (and acked (pos? acked))
                                     (div (:completeLatencyTotal m)
                                          (:acked m))
                                     0))
                            (dissoc :completeLatencyTotal)
                            (assoc :lastError (last-err-fn id)))]))
   :window->emitted (map-key str (:window->emitted acc-data))
   :window->transferred (map-key str (:window->transferred acc-data))
   :window->complete-latency
     (compute-weighted-averages-per-window acc-data
                                           :window->comp-lat-wgt-avg
                                           :window->acked)
   :window->acked (map-key str (:window->acked acc-data))
   :window->failed (map-key str (:window->failed acc-data))})

(defn- thriftify-common-agg-stats
  [^ComponentAggregateStats s
   {:keys [num-tasks
           emitted
           transferred
           acked
           failed
           num-executors] :as statk->num}]
  (let [cas (CommonAggregateStats.)]
    (and num-executors (.set_num_executors cas num-executors))
    (and num-tasks (.set_num_tasks cas num-tasks))
    (and emitted (.set_emitted cas emitted))
    (and transferred (.set_transferred cas transferred))
    (and acked (.set_acked cas acked))
    (and failed (.set_failed cas failed))
    (.set_common_stats s cas)))

(defn thriftify-bolt-agg-stats
  [statk->num]
  (let [{:keys [lastError
                execute-latency
                process-latency
                executed
                capacity]} statk->num
        s (ComponentAggregateStats.)]
    (.set_type s ComponentType/BOLT)
    (and lastError (.set_last_error s lastError))
    (thriftify-common-agg-stats s statk->num)
    (.set_specific_stats s
      (SpecificAggregateStats/bolt
        (let [bas (BoltAggregateStats.)]
          (and execute-latency (.set_execute_latency_ms bas execute-latency))
          (and process-latency (.set_process_latency_ms bas process-latency))
          (and executed (.set_executed bas executed))
          (and capacity (.set_capacity bas capacity))
          bas)))
    s))

(defn thriftify-spout-agg-stats
  [statk->num]
  (let [{:keys [lastError
                complete-latency]} statk->num
        s (ComponentAggregateStats.)]
    (.set_type s ComponentType/SPOUT)
    (and lastError (.set_last_error s lastError))
    (thriftify-common-agg-stats s statk->num)
    (.set_specific_stats s
      (SpecificAggregateStats/spout
        (let [sas (SpoutAggregateStats.)]
          (and complete-latency (.set_complete_latency_ms sas complete-latency))
          sas)))
    s))

(defn thriftify-topo-page-data
  [topology-id data]
  (let [{:keys [num-tasks
                num-workers
                num-executors
                spout-id->stats
                bolt-id->stats
                window->emitted
                window->transferred
                window->complete-latency
                window->acked
                window->failed]} data
        spout-agg-stats (into {}
                              (for [[id m] spout-id->stats
                                    :let [m (assoc m :type :spout)]]
                                [id
                                 (thriftify-spout-agg-stats m)]))
        bolt-agg-stats (into {}
                             (for [[id m] bolt-id->stats
                                   :let [m (assoc m :type :bolt)]]
                              [id
                               (thriftify-bolt-agg-stats m)]))
        topology-stats (doto (TopologyStats.)
                         (.set_window_to_emitted window->emitted)
                         (.set_window_to_transferred window->transferred)
                         (.set_window_to_complete_latencies_ms
                           window->complete-latency)
                         (.set_window_to_acked window->acked)
                         (.set_window_to_failed window->failed))
      topo-page-info (doto (TopologyPageInfo. topology-id)
                       (.set_num_tasks num-tasks)
                       (.set_num_workers num-workers)
                       (.set_num_executors num-executors)
                       (.set_id_to_spout_agg_stats spout-agg-stats)
                       (.set_id_to_bolt_agg_stats bolt-agg-stats)
                       (.set_topology_stats topology-stats))]
    topo-page-info))

(defn agg-topo-execs-stats
  "Aggregate various executor statistics for a topology from the given
  heartbeats."
  [topology-id
   exec->node+port
   task->component
   beats
   topology
   window
   include-sys?
   last-err-fn]
  (->> ;; This iterates over each executor one time, because of lazy evaluation.
    (extract-data-from-hb exec->node+port
                          task->component
                          beats
                          include-sys?
                          topology)
    (aggregate-topo-stats window include-sys?)
    (post-aggregate-topo-stats task->component exec->node+port last-err-fn)
    (thriftify-topo-page-data topology-id)))

(defn- agg-bolt-exec-win-stats
  "A helper function that aggregates windowed stats from one bolt executor."
  [acc-stats new-stats include-sys?]
  (let [{w->execLatWgtAvg :executeLatencyTotal
         w->procLatWgtAvg :processLatencyTotal
         w->executed :executed}
          (swap-map-order
            (into {} (for [w (keys (:executed new-stats))]
                       [w (agg-bolt-lat-and-count
                            (get (:execute-latencies new-stats) w)
                            (get (:process-latencies new-stats) w)
                            (get (:executed new-stats) w))])))
        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    {:window->emitted (->> (:emitted new-stats)
                           (map-val handle-sys-components-fn)
                           aggregate-count-streams
                           (merge-with + (:window->emitted acc-stats)))
     :window->transferred (->> (:transferred new-stats)
                               (map-val handle-sys-components-fn)
                               aggregate-count-streams
                               (merge-with + (:window->transferred acc-stats)))
     :window->exec-lat-wgt-avg (merge-with +
                                           (:window->exec-lat-wgt-avg acc-stats)
                                           w->execLatWgtAvg)
     :window->proc-lat-wgt-avg (merge-with +
                                           (:window->proc-lat-wgt-avg acc-stats)
                                           w->procLatWgtAvg)
     :window->executed (merge-with + (:window->executed acc-stats) w->executed)
     :window->acked (->> (:acked new-stats)
                         aggregate-count-streams
                         (merge-with + (:window->acked acc-stats)))
     :window->failed (->> (:failed new-stats)
                          aggregate-count-streams
                          (merge-with + (:window->failed acc-stats)))}))

(defn- agg-spout-exec-win-stats
  "A helper function that aggregates windowed stats from one spout executor."
  [acc-stats new-stats include-sys?]
  (let [{w->compLatWgtAvg :completeLatencyTotal
         w->acked :acked}
          (swap-map-order
            (into {} (for [w (keys (:acked new-stats))]
                       [w (agg-spout-lat-and-count
                            (get (:complete-latencies new-stats) w)
                            (get (:acked new-stats) w))])))
        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
    {:window->emitted (->> (:emitted new-stats)
                           (map-val handle-sys-components-fn)
                           aggregate-count-streams
                           (merge-with + (:window->emitted acc-stats)))
     :window->transferred (->> (:transferred new-stats)
                               (map-val handle-sys-components-fn)
                               aggregate-count-streams
                               (merge-with + (:window->transferred acc-stats)))
     :window->comp-lat-wgt-avg (merge-with +
                                           (:window->comp-lat-wgt-avg acc-stats)
                                           w->compLatWgtAvg)
     :window->acked (->> (:acked new-stats)
                         aggregate-count-streams
                         (merge-with + (:window->acked acc-stats)))
     :window->failed (->> (:failed new-stats)
                          aggregate-count-streams
                          (merge-with + (:window->failed acc-stats)))}))

(defmulti agg-comp-exec-stats
  "Combines the aggregate stats of one executor with the given map, selecting
  the appropriate window and including system components as specified."
  (fn dispatch-fn [_ _ init-val _] (:type init-val)))

(defmethod agg-comp-exec-stats :bolt
  [window include-sys? acc-stats new-data]
  (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
         :stats (merge-agg-comp-stats-comp-page-bolt
                  (:stats acc-stats)
                  (agg-pre-merge-comp-page-bolt new-data window include-sys?))
         :type :bolt))

(defmethod agg-comp-exec-stats :spout
  [window include-sys? acc-stats new-data]
  (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) include-sys?)
         :stats (merge-agg-comp-stats-comp-page-spout
                  (:stats acc-stats)
                  (agg-pre-merge-comp-page-spout new-data window include-sys?))
         :type :spout))

(defn- aggregate-comp-stats*
  [window include-sys? data init-val]
  (-> (partial agg-comp-exec-stats
               window
               include-sys?)
      (reduce init-val data)))

(defmulti aggregate-comp-stats
  (fn dispatch-fn [& args] (-> args last first :type)))

(defmethod aggregate-comp-stats :bolt
  [& args]
  (let [init-val {:type :bolt
                  :cid+sid->input-stats {}
                  :sid->output-stats {}
                  :executor-stats []
                  :window->emitted {}
                  :window->transferred {}
                  :window->exec-lat-wgt-avg {}
                  :window->executed {}
                  :window->proc-lat-wgt-avg {}
                  :window->acked {}
                  :window->failed {}}]
    (apply aggregate-comp-stats* (concat args (list init-val)))))

(defmethod aggregate-comp-stats :spout
  [& args]
  (let [init-val {:type :spout
                  :sid->output-stats {}
                  :executor-stats []
                  :window->emitted {}
                  :window->transferred {}
                  :window->comp-lat-wgt-avg {}
                  :window->acked {}
                  :window->failed {}}]
    (apply aggregate-comp-stats* (concat args (list init-val)))))

(defmethod aggregate-comp-stats :default [& _] {})

(defmulti post-aggregate-comp-stats
  (fn [_ _ data] (:type data)))

(defmethod post-aggregate-comp-stats :bolt
  [task->component
   exec->host+port
   {{i-stats :cid+sid->input-stats
     o-stats :sid->output-stats
     num-tasks :num-tasks
     num-executors :num-executors} :stats
    comp-type :type :as acc-data}]
  {:type comp-type
   :num-tasks num-tasks
   :num-executors num-executors
   :cid+sid->input-stats
   (->> i-stats
        (map-val (fn [m]
                     (let [executed (:executed m)
                           lats (if (and executed (pos? executed))
                                  {:execute-latency
                                   (div (or (:executeLatencyTotal m) 0)
                                        executed)
                                   :process-latency
                                   (div (or (:processLatencyTotal m) 0)
                                        executed)}
                                  {:execute-latency 0
                                   :process-latency 0})]
                       (-> m (merge lats) (dissoc :executeLatencyTotal
                                                  :processLatencyTotal))))))
   :sid->output-stats o-stats
   :executor-stats (:executor-stats (:stats acc-data))
   :window->emitted (map-key str (:window->emitted acc-data))
   :window->transferred (map-key str (:window->transferred acc-data))
   :window->execute-latency
     (compute-weighted-averages-per-window acc-data
                                           :window->exec-lat-wgt-avg
                                           :window->executed)
   :window->executed (map-key str (:window->executed acc-data))
   :window->process-latency
     (compute-weighted-averages-per-window acc-data
                                           :window->proc-lat-wgt-avg
                                           :window->executed)
   :window->acked (map-key str (:window->acked acc-data))
   :window->failed (map-key str (:window->failed acc-data))})

(defmethod post-aggregate-comp-stats :spout
  [task->component
   exec->host+port
   {{o-stats :sid->output-stats
     num-tasks :num-tasks
     num-executors :num-executors} :stats
    comp-type :type :as acc-data}]
  {:type comp-type
   :num-tasks num-tasks
   :num-executors num-executors
   :sid->output-stats
   (->> o-stats
        (map-val (fn [m]
                     (let [acked (:acked m)
                           lat (if (and acked (pos? acked))
                                 {:complete-latency
                                  (div (or (:completeLatencyTotal m) 0) acked)}
                                 {:complete-latency 0})]
                       (-> m (merge lat) (dissoc :completeLatencyTotal))))))
   :executor-stats (:executor-stats (:stats acc-data))
   :window->emitted (map-key str (:window->emitted acc-data))
   :window->transferred (map-key str (:window->transferred acc-data))
   :window->complete-latency
     (compute-weighted-averages-per-window acc-data
                                           :window->comp-lat-wgt-avg
                                           :window->acked)
   :window->acked (map-key str (:window->acked acc-data))
   :window->failed (map-key str (:window->failed acc-data))})

(defmethod post-aggregate-comp-stats :default [& _] {})

(defn thriftify-exec-agg-stats
  [comp-id comp-type {:keys [executor-id host port uptime] :as stats}]
  (doto (ExecutorAggregateStats.)
    (.set_exec_summary (ExecutorSummary. (apply #(ExecutorInfo. %1 %2)
                                                executor-id)
                                         comp-id
                                         host
                                         port
                                         (or uptime 0)))
    (.set_stats ((condp = comp-type
                   :bolt thriftify-bolt-agg-stats
                   :spout thriftify-spout-agg-stats) stats))))

(defn- thriftify-bolt-input-stats
  [cid+sid->input-stats]
  (into {} (for [[cid+sid input-stats] cid+sid->input-stats]
             [(to-global-stream-id cid+sid)
              (thriftify-bolt-agg-stats input-stats)])))

(defn- thriftify-bolt-output-stats
  [sid->output-stats]
  (map-val thriftify-bolt-agg-stats sid->output-stats))

(defn- thriftify-spout-output-stats
  [sid->output-stats]
  (map-val thriftify-spout-agg-stats sid->output-stats))

(defn thriftify-comp-page-data
  [topo-id topology comp-id data]
  (let [w->stats (swap-map-order
                   (merge
                     {:emitted (:window->emitted data)
                      :transferred (:window->transferred data)
                      :acked (:window->acked data)
                      :failed (:window->failed data)}
                     (condp = (:type data)
                       :bolt {:execute-latency (:window->execute-latency data)
                              :process-latency (:window->process-latency data)
                              :executed (:window->executed data)}
                       :spout {:complete-latency
                               (:window->complete-latency data)}
                       {}))) ; default
        [compType exec-stats w->stats gsid->input-stats sid->output-stats]
          (condp = (component-type topology comp-id)
            :bolt [ComponentType/BOLT
                   (->
                     (partial thriftify-exec-agg-stats comp-id :bolt)
                     (map (:executor-stats data)))
                   (map-val thriftify-bolt-agg-stats w->stats)
                   (thriftify-bolt-input-stats (:cid+sid->input-stats data))
                   (thriftify-bolt-output-stats (:sid->output-stats data))]
            :spout [ComponentType/SPOUT
                    (->
                      (partial thriftify-exec-agg-stats comp-id :spout)
                      (map (:executor-stats data)))
                    (map-val thriftify-spout-agg-stats w->stats)
                    nil ;; spouts do not have input stats
                    (thriftify-spout-output-stats (:sid->output-stats data))]),
        num-executors (:num-executors data)
        num-tasks (:num-tasks data)
        ret (doto (ComponentPageInfo. comp-id compType)
              (.set_topology_id topo-id)
              (.set_topology_name nil)
              (.set_window_to_stats w->stats)
              (.set_sid_to_output_stats sid->output-stats)
              (.set_exec_stats exec-stats))]
    (and num-executors (.set_num_executors ret num-executors))
    (and num-tasks (.set_num_tasks ret num-tasks))
    (and gsid->input-stats
         (.set_gsid_to_input_stats ret gsid->input-stats))
    ret))

(defn agg-comp-execs-stats
  "Aggregate various executor statistics for a component from the given
  heartbeats."
  [exec->host+port
   task->component
   beats
   window
   include-sys?
   topology-id
   topology
   component-id]
  (->> ;; This iterates over each executor one time, because of lazy evaluation.
    (extract-data-from-hb exec->host+port
                          task->component
                          beats
                          include-sys?
                          topology
                          component-id)
    (aggregate-comp-stats window include-sys?)
    (post-aggregate-comp-stats task->component exec->host+port)
    (thriftify-comp-page-data topology-id topology component-id)))

(defn expand-averages
  [avg counts]
  (let [avg (clojurify-structure avg)
        counts (clojurify-structure counts)]
    (into {}
          (for [[slice streams] counts]
            [slice
             (into {}
                   (for [[stream c] streams]
                     [stream
                      [(* c (get-in avg [slice stream]))
                       c]]
                     ))]))))

(defn expand-averages-seq
  [average-seq counts-seq]
  (->> (map vector average-seq counts-seq)
       (map #(apply expand-averages %))
       (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2)))))

(defn- val-avg
  [[t c]]
  (if (= t 0) 0
    (double (/ t c))))

(defn aggregate-averages
  [average-seq counts-seq]
  (->> (expand-averages-seq average-seq counts-seq)
       (map-val
         (fn [s]
           (map-val val-avg s)))))

(defn aggregate-avg-streams
  [avg counts]
  (let [expanded (expand-averages avg counts)]
    (->> expanded
         (map-val #(reduce add-pairs (vals %)))
         (map-val val-avg))))

(defn pre-process
  [stream-summary include-sys?]
  (let [filter-fn (mk-include-sys-fn include-sys?)
        emitted (:emitted stream-summary)
        emitted (into {} (for [[window stat] emitted]
                           {window (filter-key filter-fn stat)}))
        transferred (:transferred stream-summary)
        transferred (into {} (for [[window stat] transferred]
                               {window (filter-key filter-fn stat)}))
        stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted))
        stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))]
    stream-summary))

(defn aggregate-counts
  [counts-seq]
  (->> counts-seq
       (map clojurify-structure)
       (apply merge-with
              (fn [s1 s2]
                (merge-with + s1 s2)))))

(defn aggregate-common-stats
  [stats-seq]
  {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
   :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))})

(defn aggregate-bolt-stats
  [stats-seq include-sys?]
  (let [stats-seq (collectify stats-seq)]
    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
           {:acked
            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
                                   stats-seq))
            :failed
            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed)
                                   stats-seq))
            :executed
            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
                                   stats-seq))
            :process-latencies
            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg)
                                     stats-seq)
                                (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
                                     stats-seq))
            :execute-latencies
            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
                                     stats-seq)
                                (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
                                     stats-seq))})))

(defn aggregate-spout-stats
  [stats-seq include-sys?]
  (let [stats-seq (collectify stats-seq)]
    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
           {:acked
            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
                                   stats-seq))
            :failed
            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_failed)
                                   stats-seq))
            :complete-latencies
            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_spout get_complete_ms_avg)
                                     stats-seq)
                                (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
                                     stats-seq))})))

(defn get-filled-stats
  [summs]
  (->> summs
       (map #(.get_stats ^ExecutorSummary %))
       (filter not-nil?)))

(defn aggregate-spout-streams
  [stats]
  {:acked (aggregate-count-streams (:acked stats))
   :failed (aggregate-count-streams (:failed stats))
   :emitted (aggregate-count-streams (:emitted stats))
   :transferred (aggregate-count-streams (:transferred stats))
   :complete-latencies (aggregate-avg-streams (:complete-latencies stats)
                                              (:acked stats))})

(defn spout-streams-stats
  [summs include-sys?]
  (let [stats-seq (get-filled-stats summs)]
    (aggregate-spout-streams
      (aggregate-spout-stats
        stats-seq include-sys?))))

(defn aggregate-bolt-streams
  [stats]
  {:acked (aggregate-count-streams (:acked stats))
   :failed (aggregate-count-streams (:failed stats))
   :emitted (aggregate-count-streams (:emitted stats))
   :transferred (aggregate-count-streams (:transferred stats))
   :process-latencies (aggregate-avg-streams (:process-latencies stats)
                                             (:acked stats))
   :executed (aggregate-count-streams (:executed stats))
   :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
                                             (:executed stats))})

(defn compute-executor-capacity
  [^ExecutorSummary e]
  (let [stats (.get_stats e)
        stats (if stats
                (-> stats
                    (aggregate-bolt-stats true)
                    (aggregate-bolt-streams)
                    swap-map-order
                    (get (str TEN-MIN-IN-SECONDS))))
        uptime (nil-to-zero (.get_uptime_secs e))
        window (if (< uptime TEN-MIN-IN-SECONDS) uptime TEN-MIN-IN-SECONDS)
        executed (-> stats :executed nil-to-zero)
        latency (-> stats :execute-latencies nil-to-zero)]
    (if (> window 0)
      (div (* executed latency) (* 1000 window)))))

(defn bolt-streams-stats
  [summs include-sys?]
  (let [stats-seq (get-filled-stats summs)]
    (aggregate-bolt-streams
      (aggregate-bolt-stats
        stats-seq include-sys?))))

(defn total-aggregate-stats
  [spout-summs bolt-summs include-sys?]
  (let [spout-stats (get-filled-stats spout-summs)
        bolt-stats (get-filled-stats bolt-summs)
        agg-spout-stats (-> spout-stats
                            (aggregate-spout-stats include-sys?)
                            aggregate-spout-streams)
        agg-bolt-stats (-> bolt-stats
                           (aggregate-bolt-stats include-sys?)
                           aggregate-bolt-streams)]
    (merge-with
      (fn [s1 s2]
        (merge-with + s1 s2))
      (select-keys
        agg-bolt-stats
        ;; Include only keys that will be used.  We want to count acked and
        ;; failed only for the "tuple trees," so we do not include those keys
        ;; from the bolt executors.
        [:emitted :transferred])
      agg-spout-stats)))

(defn error-subset
  [error-str]
  (apply str (take 200 error-str)))

(defn most-recent-error
  [errors-list]
  (let [error (->> errors-list
                   (sort-by #(.get_error_time_secs ^ErrorInfo %))
                   reverse
                   first)]
    (if error
      (error-subset (.get_error ^ErrorInfo error))
      "")))

(defn float-str [n]
  (if n
    (format "%.3f" (float n))
    "0"))

(defn compute-bolt-capacity
  [executors]
  (->> executors
       (map compute-executor-capacity)
       (map nil-to-zero)
       (apply max)))
