| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| |
| (ns backtype.storm.stats |
| (:import [backtype.storm.generated Nimbus Nimbus$Processor Nimbus$Iface StormTopology ShellComponent |
| NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId |
| ClusterSummary TopologyInfo TopologySummary ExecutorInfo ExecutorSummary ExecutorStats |
| ExecutorSpecificStats SpoutStats BoltStats ErrorInfo |
| SupervisorSummary CommonAggregateStats ComponentAggregateStats |
| ComponentPageInfo ComponentType BoltAggregateStats |
| ExecutorAggregateStats SpecificAggregateStats |
| SpoutAggregateStats TopologyPageInfo TopologyStats]) |
| (:import [backtype.storm.utils Utils]) |
| (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]) |
| (:use [backtype.storm log util]) |
| (:use [clojure.math.numeric-tower :only [ceil]])) |
| |
| (def TEN-MIN-IN-SECONDS (* 10 60)) |
| |
| (def COMMON-FIELDS [:emitted :transferred]) |
| (defrecord CommonStats [^MultiCountStatAndMetric emitted |
| ^MultiCountStatAndMetric transferred |
| rate]) |
| |
| (def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies]) |
| ;;acked and failed count individual tuples |
| (defrecord BoltExecutorStats [^CommonStats common |
| ^MultiCountStatAndMetric acked |
| ^MultiCountStatAndMetric failed |
| ^MultiLatencyStatAndMetric process-latencies |
| ^MultiCountStatAndMetric executed |
| ^MultiLatencyStatAndMetric execute-latencies]) |
| |
| (def SPOUT-FIELDS [:acked :failed :complete-latencies]) |
| ;;acked and failed count tuple completion |
| (defrecord SpoutExecutorStats [^CommonStats common |
| ^MultiCountStatAndMetric acked |
| ^MultiCountStatAndMetric failed |
| ^MultiLatencyStatAndMetric complete-latencies]) |
| |
| (def NUM-STAT-BUCKETS 20) |
| |
| (defn- mk-common-stats |
| [rate] |
| (CommonStats. |
| (MultiCountStatAndMetric. NUM-STAT-BUCKETS) |
| (MultiCountStatAndMetric. NUM-STAT-BUCKETS) |
| rate)) |
| |
| (defn mk-bolt-stats |
| [rate] |
| (BoltExecutorStats. |
| (mk-common-stats rate) |
| (MultiCountStatAndMetric. NUM-STAT-BUCKETS) |
| (MultiCountStatAndMetric. NUM-STAT-BUCKETS) |
| (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS) |
| (MultiCountStatAndMetric. NUM-STAT-BUCKETS) |
| (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS))) |
| |
| (defn mk-spout-stats |
| [rate] |
| (SpoutExecutorStats. |
| (mk-common-stats rate) |
| (MultiCountStatAndMetric. NUM-STAT-BUCKETS) |
| (MultiCountStatAndMetric. NUM-STAT-BUCKETS) |
| (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS))) |
| |
| (defmacro stats-rate |
| [stats] |
| `(-> ~stats :common :rate)) |
| |
| (defmacro stats-emitted |
| [stats] |
| `(-> ~stats :common :emitted)) |
| |
| (defmacro stats-transferred |
| [stats] |
| `(-> ~stats :common :transferred)) |
| |
| (defmacro stats-executed |
| [stats] |
| `(:executed ~stats)) |
| |
| (defmacro stats-acked |
| [stats] |
| `(:acked ~stats)) |
| |
| (defmacro stats-failed |
| [stats] |
| `(:failed ~stats)) |
| |
| (defmacro stats-execute-latencies |
| [stats] |
| `(:execute-latencies ~stats)) |
| |
| (defmacro stats-process-latencies |
| [stats] |
| `(:process-latencies ~stats)) |
| |
| (defmacro stats-complete-latencies |
| [stats] |
| `(:complete-latencies ~stats)) |
| |
| (defn emitted-tuple! |
| [stats stream] |
| (.incBy ^MultiCountStatAndMetric (stats-emitted stats) ^Object stream ^long (stats-rate stats))) |
| |
| (defn transferred-tuples! |
| [stats stream amt] |
| (.incBy ^MultiCountStatAndMetric (stats-transferred stats) ^Object stream ^long (* (stats-rate stats) amt))) |
| |
| (defn bolt-execute-tuple! |
| [^BoltExecutorStats stats component stream latency-ms] |
| (let [key [component stream] |
| ^MultiCountStatAndMetric executed (stats-executed stats) |
| ^MultiLatencyStatAndMetric exec-lat (stats-execute-latencies stats)] |
| (.incBy executed key (stats-rate stats)) |
| (.record exec-lat key latency-ms))) |
| |
| (defn bolt-acked-tuple! |
| [^BoltExecutorStats stats component stream latency-ms] |
| (let [key [component stream] |
| ^MultiCountStatAndMetric acked (stats-acked stats) |
| ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)] |
| (.incBy acked key (stats-rate stats)) |
| (.record process-lat key latency-ms))) |
| |
| (defn bolt-failed-tuple! |
| [^BoltExecutorStats stats component stream latency-ms] |
| (let [key [component stream] |
| ^MultiCountStatAndMetric failed (stats-failed stats)] |
| (.incBy failed key (stats-rate stats)))) |
| |
| (defn spout-acked-tuple! |
| [^SpoutExecutorStats stats stream latency-ms] |
| (.incBy ^MultiCountStatAndMetric (stats-acked stats) stream (stats-rate stats)) |
| (.record ^MultiLatencyStatAndMetric (stats-complete-latencies stats) stream latency-ms)) |
| |
| (defn spout-failed-tuple! |
| [^SpoutExecutorStats stats stream latency-ms] |
| (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats))) |
| |
| (defn- cleanup-stat! [stat] |
| (.close stat)) |
| |
| (defn- cleanup-common-stats! |
| [^CommonStats stats] |
| (doseq [f COMMON-FIELDS] |
| (cleanup-stat! (f stats)))) |
| |
| (defn cleanup-bolt-stats! |
| [^BoltExecutorStats stats] |
| (cleanup-common-stats! (:common stats)) |
| (doseq [f BOLT-FIELDS] |
| (cleanup-stat! (f stats)))) |
| |
| (defn cleanup-spout-stats! |
| [^SpoutExecutorStats stats] |
| (cleanup-common-stats! (:common stats)) |
| (doseq [f SPOUT-FIELDS] |
| (cleanup-stat! (f stats)))) |
| |
| (defn- value-stats |
| [stats fields] |
| (into {} (dofor [f fields] |
| [f (if (instance? MultiCountStatAndMetric (f stats)) |
| (.getTimeCounts ^MultiCountStatAndMetric (f stats)) |
| (.getTimeLatAvg ^MultiLatencyStatAndMetric (f stats)))]))) |
| |
| (defn- value-common-stats |
| [^CommonStats stats] |
| (merge |
| (value-stats stats COMMON-FIELDS) |
| {:rate (:rate stats)})) |
| |
| (defn value-bolt-stats! |
| [^BoltExecutorStats stats] |
| (cleanup-bolt-stats! stats) |
| (merge (value-common-stats (:common stats)) |
| (value-stats stats BOLT-FIELDS) |
| {:type :bolt})) |
| |
| (defn value-spout-stats! |
| [^SpoutExecutorStats stats] |
| (cleanup-spout-stats! stats) |
| (merge (value-common-stats (:common stats)) |
| (value-stats stats SPOUT-FIELDS) |
| {:type :spout})) |
| |
| (defmulti render-stats! class-selector) |
| |
| (defmethod render-stats! SpoutExecutorStats |
| [stats] |
| (value-spout-stats! stats)) |
| |
| (defmethod render-stats! BoltExecutorStats |
| [stats] |
| (value-bolt-stats! stats)) |
| |
| (defmulti thriftify-specific-stats :type) |
| (defmulti clojurify-specific-stats class-selector) |
| |
| (defn window-set-converter |
| ([stats key-fn first-key-fun] |
| (into {} |
| (for [[k v] stats] |
| ;apply the first-key-fun only to first key. |
| [(first-key-fun k) |
| (into {} (for [[k2 v2] v] |
| [(key-fn k2) v2]))]))) |
| ([stats first-key-fun] |
| (window-set-converter stats identity first-key-fun))) |
| |
| (defn to-global-stream-id |
| [[component stream]] |
| (GlobalStreamId. component stream)) |
| |
| (defn from-global-stream-id [global-stream-id] |
| [(.get_componentId global-stream-id) (.get_streamId global-stream-id)]) |
| |
| (defmethod clojurify-specific-stats BoltStats [^BoltStats stats] |
| [(window-set-converter (.get_acked stats) from-global-stream-id identity) |
| (window-set-converter (.get_failed stats) from-global-stream-id identity) |
| (window-set-converter (.get_process_ms_avg stats) from-global-stream-id identity) |
| (window-set-converter (.get_executed stats) from-global-stream-id identity) |
| (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id identity)]) |
| |
| (defmethod clojurify-specific-stats SpoutStats [^SpoutStats stats] |
| [(.get_acked stats) |
| (.get_failed stats) |
| (.get_complete_ms_avg stats)]) |
| |
| |
| (defn clojurify-executor-stats |
| [^ExecutorStats stats] |
| (let [ specific-stats (.get_specific stats) |
| is_bolt? (.is_set_bolt specific-stats) |
| specific-stats (if is_bolt? (.get_bolt specific-stats) (.get_spout specific-stats)) |
| specific-stats (clojurify-specific-stats specific-stats) |
| common-stats (CommonStats. (.get_emitted stats) |
| (.get_transferred stats) |
| (.get_rate stats))] |
| (if is_bolt? |
| ; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats! |
| ; which flattens the BoltExecutorStats/SpoutExecutorStats by extracting values from all atoms and merging all values inside :common to top |
| ;level map we are pretty much doing the same here. |
| (dissoc (merge common-stats {:type :bolt} (apply ->BoltExecutorStats (into [nil] specific-stats))) :common) |
| (dissoc (merge common-stats {:type :spout} (apply ->SpoutExecutorStats (into [nil] specific-stats))) :common) |
| ))) |
| |
| (defmethod thriftify-specific-stats :bolt |
| [stats] |
| (ExecutorSpecificStats/bolt |
| (BoltStats. |
| (window-set-converter (:acked stats) to-global-stream-id str) |
| (window-set-converter (:failed stats) to-global-stream-id str) |
| (window-set-converter (:process-latencies stats) to-global-stream-id str) |
| (window-set-converter (:executed stats) to-global-stream-id str) |
| (window-set-converter (:execute-latencies stats) to-global-stream-id str)))) |
| |
| (defmethod thriftify-specific-stats :spout |
| [stats] |
| (ExecutorSpecificStats/spout |
| (SpoutStats. (window-set-converter (:acked stats) str) |
| (window-set-converter (:failed stats) str) |
| (window-set-converter (:complete-latencies stats) str)))) |
| |
| (defn thriftify-executor-stats |
| [stats] |
| (let [specific-stats (thriftify-specific-stats stats) |
| rate (:rate stats)] |
| (ExecutorStats. (window-set-converter (:emitted stats) str) |
| (window-set-converter (:transferred stats) str) |
| specific-stats |
| rate))) |
| |
| (defn valid-number? |
| "Returns true if x is a number that is not NaN or Infinity, false otherwise" |
| [x] |
| (and (number? x) |
| (not (Double/isNaN x)) |
| (not (Double/isInfinite x)))) |
| |
| (defn apply-default |
| [f defaulting-fn & args] |
| (apply f (map defaulting-fn args))) |
| |
| (defn apply-or-0 |
| [f & args] |
| (apply apply-default |
| f |
| #(if (valid-number? %) % 0) |
| args)) |
| |
| (defn sum-or-0 |
| [& args] |
| (apply apply-or-0 + args)) |
| |
| (defn product-or-0 |
| [& args] |
| (apply apply-or-0 * args)) |
| |
| (defn max-or-0 |
| [& args] |
| (apply apply-or-0 max args)) |
| |
| (defn- agg-bolt-lat-and-count |
| "Aggregates number executed, process latency, and execute latency across all |
| streams." |
| [idk->exec-avg idk->proc-avg idk->num-executed] |
| (letfn [(weight-avg [[id avg]] |
| (let [num-e (get idk->num-executed id)] |
| (product-or-0 avg num-e)))] |
| {:executeLatencyTotal (sum (map weight-avg idk->exec-avg)) |
| :processLatencyTotal (sum (map weight-avg idk->proc-avg)) |
| :executed (sum (vals idk->num-executed))})) |
| |
| (defn- agg-spout-lat-and-count |
| "Aggregates number acked and complete latencies across all streams." |
| [sid->comp-avg sid->num-acked] |
| (letfn [(weight-avg [[id avg]] |
| (product-or-0 avg (get sid->num-acked id)))] |
| {:completeLatencyTotal (sum (map weight-avg sid->comp-avg)) |
| :acked (sum (vals sid->num-acked))})) |
| |
| (defn add-pairs |
| ([] [0 0]) |
| ([[a1 a2] [b1 b2]] |
| [(+ a1 b1) (+ a2 b2)])) |
| |
| (defn mk-include-sys-fn |
| [include-sys?] |
| (if include-sys? |
| (fn [_] true) |
| (fn [stream] (and (string? stream) (not (Utils/isSystemId stream)))))) |
| |
| (defn mk-include-sys-filter |
| "Returns a function that includes or excludes map entries whose keys are |
| system ids." |
| [include-sys?] |
| (if include-sys? |
| identity |
| (partial filter-key (mk-include-sys-fn false)))) |
| |
| (defn- agg-bolt-streams-lat-and-count |
| "Aggregates number executed and process & execute latencies." |
| [idk->exec-avg idk->proc-avg idk->executed] |
| (letfn [(weight-avg [id avg] |
| (let [num-e (idk->executed id)] |
| (product-or-0 avg num-e)))] |
| (into {} |
| (for [k (keys idk->exec-avg)] |
| [k {:executeLatencyTotal (weight-avg k (get idk->exec-avg k)) |
| :processLatencyTotal (weight-avg k (get idk->proc-avg k)) |
| :executed (idk->executed k)}])))) |
| |
| (defn- agg-spout-streams-lat-and-count |
| "Aggregates number acked and complete latencies." |
| [idk->comp-avg idk->acked] |
| (letfn [(weight-avg [id avg] |
| (let [num-e (get idk->acked id)] |
| (product-or-0 avg num-e)))] |
| (into {} |
| (for [k (keys idk->comp-avg)] |
| [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k)) |
| :acked (get idk->acked k)}])))) |
| |
| (defn swap-map-order |
| "For a nested map, rearrange data such that the top-level keys become the |
| nested map's keys and vice versa. |
| Example: |
| {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}} |
| -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}" |
| [m] |
| (apply merge-with |
| merge |
| (map (fn [[k v]] |
| (into {} |
| (for [[k2 v2] v] |
| [k2 {k v2}]))) |
| m))) |
| |
| (defn- compute-agg-capacity |
| "Computes the capacity metric for one executor given its heartbeat data and |
| uptime." |
| [m uptime] |
| (when uptime |
| (->> |
| ;; For each stream, create weighted averages and counts. |
| (merge-with (fn weighted-avg+count-fn |
| [avg cnt] |
| [(* avg cnt) cnt]) |
| (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS)) |
| (get (:executed m) (str TEN-MIN-IN-SECONDS))) |
| vals ;; Ignore the stream ids. |
| (reduce add-pairs |
| [0. 0]) ;; Combine weighted averages and counts. |
| ((fn [[weighted-avg cnt]] |
| (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS)))))))) |
| |
| (defn agg-pre-merge-comp-page-bolt |
| [{exec-id :exec-id |
| host :host |
| port :port |
| uptime :uptime |
| comp-id :comp-id |
| num-tasks :num-tasks |
| statk->w->sid->num :stats} |
| window |
| include-sys?] |
| (let [str-key (partial map-key str) |
| handle-sys-components-fn (mk-include-sys-filter include-sys?)] |
| {:executor-id exec-id, |
| :host host, |
| :port port, |
| :uptime uptime, |
| :num-executors 1, |
| :num-tasks num-tasks, |
| :capacity (compute-agg-capacity statk->w->sid->num uptime) |
| :cid+sid->input-stats |
| (merge-with |
| merge |
| (swap-map-order |
| {:acked (-> statk->w->sid->num |
| :acked |
| str-key |
| (get window)) |
| :failed (-> statk->w->sid->num |
| :failed |
| str-key |
| (get window))}) |
| (agg-bolt-streams-lat-and-count (-> statk->w->sid->num |
| :execute-latencies |
| str-key |
| (get window)) |
| (-> statk->w->sid->num |
| :process-latencies |
| str-key |
| (get window)) |
| (-> statk->w->sid->num |
| :executed |
| str-key |
| (get window)))), |
| :sid->output-stats |
| (swap-map-order |
| {:emitted (-> statk->w->sid->num |
| :emitted |
| str-key |
| (get window) |
| handle-sys-components-fn) |
| :transferred (-> statk->w->sid->num |
| :transferred |
| str-key |
| (get window) |
| handle-sys-components-fn)})})) |
| |
| (defn agg-pre-merge-comp-page-spout |
| [{exec-id :exec-id |
| host :host |
| port :port |
| uptime :uptime |
| comp-id :comp-id |
| num-tasks :num-tasks |
| statk->w->sid->num :stats} |
| window |
| include-sys?] |
| (let [str-key (partial map-key str) |
| handle-sys-components-fn (mk-include-sys-filter include-sys?)] |
| {:executor-id exec-id, |
| :host host, |
| :port port, |
| :uptime uptime, |
| :num-executors 1, |
| :num-tasks num-tasks, |
| :sid->output-stats |
| (merge-with |
| merge |
| (agg-spout-streams-lat-and-count (-> statk->w->sid->num |
| :complete-latencies |
| str-key |
| (get window)) |
| (-> statk->w->sid->num |
| :acked |
| str-key |
| (get window))) |
| (swap-map-order |
| {:acked (-> statk->w->sid->num |
| :acked |
| str-key |
| (get window)) |
| :failed (-> statk->w->sid->num |
| :failed |
| str-key |
| (get window)) |
| :emitted (-> statk->w->sid->num |
| :emitted |
| str-key |
| (get window) |
| handle-sys-components-fn) |
| :transferred (-> statk->w->sid->num |
| :transferred |
| str-key |
| (get window) |
| handle-sys-components-fn)}))})) |
| |
| (defn agg-pre-merge-topo-page-bolt |
| [{comp-id :comp-id |
| num-tasks :num-tasks |
| statk->w->sid->num :stats |
| uptime :uptime} |
| window |
| include-sys?] |
| (let [str-key (partial map-key str) |
| handle-sys-components-fn (mk-include-sys-filter include-sys?)] |
| {comp-id |
| (merge |
| (agg-bolt-lat-and-count (-> statk->w->sid->num |
| :execute-latencies |
| str-key |
| (get window)) |
| (-> statk->w->sid->num |
| :process-latencies |
| str-key |
| (get window)) |
| (-> statk->w->sid->num |
| :executed |
| str-key |
| (get window))) |
| {:num-executors 1 |
| :num-tasks num-tasks |
| :emitted (-> statk->w->sid->num |
| :emitted |
| str-key |
| (get window) |
| handle-sys-components-fn |
| vals |
| sum) |
| :transferred (-> statk->w->sid->num |
| :transferred |
| str-key |
| (get window) |
| handle-sys-components-fn |
| vals |
| sum) |
| :capacity (compute-agg-capacity statk->w->sid->num uptime) |
| :acked (-> statk->w->sid->num |
| :acked |
| str-key |
| (get window) |
| vals |
| sum) |
| :failed (-> statk->w->sid->num |
| :failed |
| str-key |
| (get window) |
| vals |
| sum)})})) |
| |
| (defn agg-pre-merge-topo-page-spout |
| [{comp-id :comp-id |
| num-tasks :num-tasks |
| statk->w->sid->num :stats} |
| window |
| include-sys?] |
| (let [str-key (partial map-key str) |
| handle-sys-components-fn (mk-include-sys-filter include-sys?)] |
| {comp-id |
| (merge |
| (agg-spout-lat-and-count (-> statk->w->sid->num |
| :complete-latencies |
| str-key |
| (get window)) |
| (-> statk->w->sid->num |
| :acked |
| str-key |
| (get window))) |
| {:num-executors 1 |
| :num-tasks num-tasks |
| :emitted (-> statk->w->sid->num |
| :emitted |
| str-key |
| (get window) |
| handle-sys-components-fn |
| vals |
| sum) |
| :transferred (-> statk->w->sid->num |
| :transferred |
| str-key |
| (get window) |
| handle-sys-components-fn |
| vals |
| sum) |
| :failed (-> statk->w->sid->num |
| :failed |
| str-key |
| (get window) |
| vals |
| sum)})})) |
| |
| (defn merge-agg-comp-stats-comp-page-bolt |
| [{acc-in :cid+sid->input-stats |
| acc-out :sid->output-stats |
| :as acc-bolt-stats} |
| {bolt-in :cid+sid->input-stats |
| bolt-out :sid->output-stats |
| :as bolt-stats}] |
| {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)), |
| :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats)), |
| :sid->output-stats (merge-with (partial merge-with sum-or-0) |
| acc-out |
| bolt-out), |
| :cid+sid->input-stats (merge-with (partial merge-with sum-or-0) |
| acc-in |
| bolt-in), |
| :executor-stats |
| (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0))) |
| executed (sum-streams bolt-in :executed)] |
| (conj (:executor-stats acc-bolt-stats) |
| (merge |
| (select-keys bolt-stats |
| [:executor-id :uptime :host :port :capacity]) |
| {:emitted (sum-streams bolt-out :emitted) |
| :transferred (sum-streams bolt-out :transferred) |
| :acked (sum-streams bolt-in :acked) |
| :failed (sum-streams bolt-in :failed) |
| :executed executed} |
| (->> |
| (if (and executed (pos? executed)) |
| [(div (sum-streams bolt-in :executeLatencyTotal) executed) |
| (div (sum-streams bolt-in :processLatencyTotal) executed)] |
| [nil nil]) |
| (mapcat vector [:execute-latency :process-latency]) |
| (apply assoc {})))))}) |
| |
| (defn merge-agg-comp-stats-comp-page-spout |
| [{acc-out :sid->output-stats |
| :as acc-spout-stats} |
| {spout-out :sid->output-stats |
| :as spout-stats}] |
| {:num-executors (inc (or (:num-executors acc-spout-stats) 0)), |
| :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats)), |
| :sid->output-stats (merge-with (partial merge-with sum-or-0) |
| acc-out |
| spout-out), |
| :executor-stats |
| (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0))) |
| acked (sum-streams spout-out :acked)] |
| (conj (:executor-stats acc-spout-stats) |
| (merge |
| (select-keys spout-stats [:executor-id :uptime :host :port]) |
| {:emitted (sum-streams spout-out :emitted) |
| :transferred (sum-streams spout-out :transferred) |
| :acked acked |
| :failed (sum-streams spout-out :failed)} |
| {:complete-latency (if (and acked (pos? acked)) |
| (div (sum-streams spout-out |
| :completeLatencyTotal) |
| acked) |
| nil)})))}) |
| |
| (defn merge-agg-comp-stats-topo-page-bolt |
| [acc-bolt-stats bolt-stats] |
| {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)) |
| :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats)) |
| :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats)) |
| :transferred (sum-or-0 (:transferred acc-bolt-stats) |
| (:transferred bolt-stats)) |
| :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats)) |
| ;; We sum average latency totals here to avoid dividing at each step. |
| ;; Compute the average latencies by dividing the total by the count. |
| :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats) |
| (:executeLatencyTotal bolt-stats)) |
| :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats) |
| (:processLatencyTotal bolt-stats)) |
| :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats)) |
| :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats)) |
| :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))}) |
| |
| (defn merge-agg-comp-stats-topo-page-spout |
| [acc-spout-stats spout-stats] |
| {:num-executors (inc (or (:num-executors acc-spout-stats) 0)) |
| :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats)) |
| :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats)) |
| :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred spout-stats)) |
| ;; We sum average latency totals here to avoid dividing at each step. |
| ;; Compute the average latencies by dividing the total by the count. |
| :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats) |
| (:completeLatencyTotal spout-stats)) |
| :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats)) |
| :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))}) |
| |
| (defn aggregate-count-streams |
| [stats] |
| (->> stats |
| (map-val #(reduce + (vals %))))) |
| |
| (defn- agg-topo-exec-stats* |
| "A helper function that does the common work to aggregate stats of one |
| executor with the given map for the topology page." |
| [window |
| include-sys? |
| {:keys [workers-set |
| bolt-id->stats |
| spout-id->stats |
| window->emitted |
| window->transferred |
| window->comp-lat-wgt-avg |
| window->acked |
| window->failed] :as acc-stats} |
| {:keys [stats] :as new-data} |
| pre-merge-fn |
| merge-fn |
| comp-key] |
| (let [cid->statk->num (pre-merge-fn new-data window include-sys?) |
| {w->compLatWgtAvg :completeLatencyTotal |
| w->acked :acked} |
| (if (:complete-latencies stats) |
| (swap-map-order |
| (into {} |
| (for [w (keys (:acked stats))] |
| [w (agg-spout-lat-and-count |
| (get (:complete-latencies stats) w) |
| (get (:acked stats) w))]))) |
| {:completeLatencyTotal nil |
| :acks (aggregate-count-streams (:acked stats))}) |
| handle-sys-components-fn (mk-include-sys-filter include-sys?)] |
| (assoc {:workers-set (conj workers-set |
| [(:host new-data) (:port new-data)]) |
| :bolt-id->stats bolt-id->stats |
| :spout-id->stats spout-id->stats |
| :window->emitted (->> (:emitted stats) |
| (map-val handle-sys-components-fn) |
| aggregate-count-streams |
| (merge-with + window->emitted)) |
| :window->transferred (->> (:transferred stats) |
| (map-val handle-sys-components-fn) |
| aggregate-count-streams |
| (merge-with + window->transferred)) |
| :window->comp-lat-wgt-avg (merge-with + |
| window->comp-lat-wgt-avg |
| w->compLatWgtAvg) |
| :window->acked (if (= :spout (:type stats)) |
| (merge-with + window->acked w->acked) |
| window->acked) |
| :window->failed (if (= :spout (:type stats)) |
| (->> (:failed stats) |
| aggregate-count-streams |
| (merge-with + window->failed)) |
| window->failed)} |
| comp-key (merge-with merge-fn |
| (acc-stats comp-key) |
| cid->statk->num) |
| :type (:type stats)))) |
| |
| (defmulti agg-topo-exec-stats |
| "Combines the aggregate stats of one executor with the given map, selecting |
| the appropriate window and including system components as specified." |
| (fn dispatch-fn [& args] (:type (last args)))) |
| |
| (defmethod agg-topo-exec-stats :bolt |
| [window include-sys? acc-stats new-data] |
| (agg-topo-exec-stats* window |
| include-sys? |
| acc-stats |
| new-data |
| agg-pre-merge-topo-page-bolt |
| merge-agg-comp-stats-topo-page-bolt |
| :bolt-id->stats)) |
| |
| (defmethod agg-topo-exec-stats :spout |
| [window include-sys? acc-stats new-data] |
| (agg-topo-exec-stats* window |
| include-sys? |
| acc-stats |
| new-data |
| agg-pre-merge-topo-page-spout |
| merge-agg-comp-stats-topo-page-spout |
| :spout-id->stats)) |
| |
| (defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats) |
| |
| (defn get-last-error |
| [storm-cluster-state storm-id component-id] |
| (if-let [e (.last-error storm-cluster-state storm-id component-id)] |
| (ErrorInfo. (:error e) (:time-secs e)))) |
| |
| (defn component-type |
| "Returns the component type (either :bolt or :spout) for a given |
| topology and component id. Returns nil if not found." |
| [^StormTopology topology id] |
| (let [bolts (.get_bolts topology) |
| spouts (.get_spouts topology)] |
| (cond |
| (Utils/isSystemId id) :bolt |
| (.containsKey bolts id) :bolt |
| (.containsKey spouts id) :spout))) |
| |
| (defn extract-nodeinfos-from-hb-for-comp |
| ([exec->host+port task->component include-sys? comp-id] |
| (distinct (for [[[start end :as executor] [host port]] exec->host+port |
| :let [id (task->component start)] |
| :when (and (or (nil? comp-id) (= comp-id id)) |
| (or include-sys? (not (Utils/isSystemId id))))] |
| {:host host |
| :port port})))) |
| |
| (defn extract-data-from-hb |
| ([exec->host+port task->component beats include-sys? topology comp-id] |
| (for [[[start end :as executor] [host port]] exec->host+port |
| :let [beat (beats executor) |
| id (task->component start)] |
| :when (and (or (nil? comp-id) (= comp-id id)) |
| (or include-sys? (not (Utils/isSystemId id))))] |
| {:exec-id executor |
| :comp-id id |
| :num-tasks (count (range start (inc end))) |
| :host host |
| :port port |
| :uptime (:uptime beat) |
| :stats (:stats beat) |
| :type (or (:type (:stats beat)) |
| (component-type topology id))})) |
| ([exec->host+port task->component beats include-sys? topology] |
| (extract-data-from-hb exec->host+port |
| task->component |
| beats |
| include-sys? |
| topology |
| nil))) |
| |
| (defn aggregate-topo-stats |
| [window include-sys? data] |
| (let [init-val {:workers-set #{} |
| :bolt-id->stats {} |
| :spout-id->stats {} |
| :window->emitted {} |
| :window->transferred {} |
| :window->comp-lat-wgt-avg {} |
| :window->acked {} |
| :window->failed {}} |
| reducer-fn (partial agg-topo-exec-stats |
| window |
| include-sys?)] |
| (reduce reducer-fn init-val data))) |
| |
| (defn- compute-weighted-averages-per-window |
| [acc-data wgt-avg-key divisor-key] |
| (into {} (for [[window wgt-avg] (wgt-avg-key acc-data) |
| :let [divisor ((divisor-key acc-data) window)] |
| :when (and divisor (pos? divisor))] |
| [(str window) (div wgt-avg divisor)]))) |
| |
| (defn- post-aggregate-topo-stats |
| [task->component exec->node+port last-err-fn acc-data] |
| {:num-tasks (count task->component) |
| :num-workers (count (:workers-set acc-data)) |
| :num-executors (count exec->node+port) |
| :bolt-id->stats |
| (into {} (for [[id m] (:bolt-id->stats acc-data) |
| :let [executed (:executed m)]] |
| [id (-> m |
| (assoc :execute-latency |
| (if (and executed (pos? executed)) |
| (div (or (:executeLatencyTotal m) 0) |
| executed) |
| 0) |
| :process-latency |
| (if (and executed (pos? executed)) |
| (div (or (:processLatencyTotal m) 0) |
| executed) |
| 0)) |
| (dissoc :executeLatencyTotal |
| :processLatencyTotal) |
| (assoc :lastError (last-err-fn id)))])) |
| :spout-id->stats |
| (into {} (for [[id m] (:spout-id->stats acc-data) |
| :let [acked (:acked m)]] |
| [id (-> m |
| (assoc :complete-latency |
| (if (and acked (pos? acked)) |
| (div (:completeLatencyTotal m) |
| (:acked m)) |
| 0)) |
| (dissoc :completeLatencyTotal) |
| (assoc :lastError (last-err-fn id)))])) |
| :window->emitted (map-key str (:window->emitted acc-data)) |
| :window->transferred (map-key str (:window->transferred acc-data)) |
| :window->complete-latency |
| (compute-weighted-averages-per-window acc-data |
| :window->comp-lat-wgt-avg |
| :window->acked) |
| :window->acked (map-key str (:window->acked acc-data)) |
| :window->failed (map-key str (:window->failed acc-data))}) |
| |
| (defn- thriftify-common-agg-stats |
| [^ComponentAggregateStats s |
| {:keys [num-tasks |
| emitted |
| transferred |
| acked |
| failed |
| num-executors] :as statk->num}] |
| (let [cas (CommonAggregateStats.)] |
| (and num-executors (.set_num_executors cas num-executors)) |
| (and num-tasks (.set_num_tasks cas num-tasks)) |
| (and emitted (.set_emitted cas emitted)) |
| (and transferred (.set_transferred cas transferred)) |
| (and acked (.set_acked cas acked)) |
| (and failed (.set_failed cas failed)) |
| (.set_common_stats s cas))) |
| |
| (defn thriftify-bolt-agg-stats |
| [statk->num] |
| (let [{:keys [lastError |
| execute-latency |
| process-latency |
| executed |
| capacity]} statk->num |
| s (ComponentAggregateStats.)] |
| (.set_type s ComponentType/BOLT) |
| (and lastError (.set_last_error s lastError)) |
| (thriftify-common-agg-stats s statk->num) |
| (.set_specific_stats s |
| (SpecificAggregateStats/bolt |
| (let [bas (BoltAggregateStats.)] |
| (and execute-latency (.set_execute_latency_ms bas execute-latency)) |
| (and process-latency (.set_process_latency_ms bas process-latency)) |
| (and executed (.set_executed bas executed)) |
| (and capacity (.set_capacity bas capacity)) |
| bas))) |
| s)) |
| |
| (defn thriftify-spout-agg-stats |
| [statk->num] |
| (let [{:keys [lastError |
| complete-latency]} statk->num |
| s (ComponentAggregateStats.)] |
| (.set_type s ComponentType/SPOUT) |
| (and lastError (.set_last_error s lastError)) |
| (thriftify-common-agg-stats s statk->num) |
| (.set_specific_stats s |
| (SpecificAggregateStats/spout |
| (let [sas (SpoutAggregateStats.)] |
| (and complete-latency (.set_complete_latency_ms sas complete-latency)) |
| sas))) |
| s)) |
| |
| (defn thriftify-topo-page-data |
| [topology-id data] |
| (let [{:keys [num-tasks |
| num-workers |
| num-executors |
| spout-id->stats |
| bolt-id->stats |
| window->emitted |
| window->transferred |
| window->complete-latency |
| window->acked |
| window->failed]} data |
| spout-agg-stats (into {} |
| (for [[id m] spout-id->stats |
| :let [m (assoc m :type :spout)]] |
| [id |
| (thriftify-spout-agg-stats m)])) |
| bolt-agg-stats (into {} |
| (for [[id m] bolt-id->stats |
| :let [m (assoc m :type :bolt)]] |
| [id |
| (thriftify-bolt-agg-stats m)])) |
| topology-stats (doto (TopologyStats.) |
| (.set_window_to_emitted window->emitted) |
| (.set_window_to_transferred window->transferred) |
| (.set_window_to_complete_latencies_ms |
| window->complete-latency) |
| (.set_window_to_acked window->acked) |
| (.set_window_to_failed window->failed)) |
| topo-page-info (doto (TopologyPageInfo. topology-id) |
| (.set_num_tasks num-tasks) |
| (.set_num_workers num-workers) |
| (.set_num_executors num-executors) |
| (.set_id_to_spout_agg_stats spout-agg-stats) |
| (.set_id_to_bolt_agg_stats bolt-agg-stats) |
| (.set_topology_stats topology-stats))] |
| topo-page-info)) |
| |
| (defn agg-topo-execs-stats |
| "Aggregate various executor statistics for a topology from the given |
| heartbeats." |
| [topology-id |
| exec->node+port |
| task->component |
| beats |
| topology |
| window |
| include-sys? |
| last-err-fn] |
| (->> ;; This iterates over each executor one time, because of lazy evaluation. |
| (extract-data-from-hb exec->node+port |
| task->component |
| beats |
| include-sys? |
| topology) |
| (aggregate-topo-stats window include-sys?) |
| (post-aggregate-topo-stats task->component exec->node+port last-err-fn) |
| (thriftify-topo-page-data topology-id))) |
| |
| (defn- agg-bolt-exec-win-stats |
| "A helper function that aggregates windowed stats from one bolt executor." |
| [acc-stats new-stats include-sys?] |
| (let [{w->execLatWgtAvg :executeLatencyTotal |
| w->procLatWgtAvg :processLatencyTotal |
| w->executed :executed} |
| (swap-map-order |
| (into {} (for [w (keys (:executed new-stats))] |
| [w (agg-bolt-lat-and-count |
| (get (:execute-latencies new-stats) w) |
| (get (:process-latencies new-stats) w) |
| (get (:executed new-stats) w))]))) |
| handle-sys-components-fn (mk-include-sys-filter include-sys?)] |
| {:window->emitted (->> (:emitted new-stats) |
| (map-val handle-sys-components-fn) |
| aggregate-count-streams |
| (merge-with + (:window->emitted acc-stats))) |
| :window->transferred (->> (:transferred new-stats) |
| (map-val handle-sys-components-fn) |
| aggregate-count-streams |
| (merge-with + (:window->transferred acc-stats))) |
| :window->exec-lat-wgt-avg (merge-with + |
| (:window->exec-lat-wgt-avg acc-stats) |
| w->execLatWgtAvg) |
| :window->proc-lat-wgt-avg (merge-with + |
| (:window->proc-lat-wgt-avg acc-stats) |
| w->procLatWgtAvg) |
| :window->executed (merge-with + (:window->executed acc-stats) w->executed) |
| :window->acked (->> (:acked new-stats) |
| aggregate-count-streams |
| (merge-with + (:window->acked acc-stats))) |
| :window->failed (->> (:failed new-stats) |
| aggregate-count-streams |
| (merge-with + (:window->failed acc-stats)))})) |
| |
| (defn- agg-spout-exec-win-stats |
| "A helper function that aggregates windowed stats from one spout executor." |
| [acc-stats new-stats include-sys?] |
| (let [{w->compLatWgtAvg :completeLatencyTotal |
| w->acked :acked} |
| (swap-map-order |
| (into {} (for [w (keys (:acked new-stats))] |
| [w (agg-spout-lat-and-count |
| (get (:complete-latencies new-stats) w) |
| (get (:acked new-stats) w))]))) |
| handle-sys-components-fn (mk-include-sys-filter include-sys?)] |
| {:window->emitted (->> (:emitted new-stats) |
| (map-val handle-sys-components-fn) |
| aggregate-count-streams |
| (merge-with + (:window->emitted acc-stats))) |
| :window->transferred (->> (:transferred new-stats) |
| (map-val handle-sys-components-fn) |
| aggregate-count-streams |
| (merge-with + (:window->transferred acc-stats))) |
| :window->comp-lat-wgt-avg (merge-with + |
| (:window->comp-lat-wgt-avg acc-stats) |
| w->compLatWgtAvg) |
| :window->acked (->> (:acked new-stats) |
| aggregate-count-streams |
| (merge-with + (:window->acked acc-stats))) |
| :window->failed (->> (:failed new-stats) |
| aggregate-count-streams |
| (merge-with + (:window->failed acc-stats)))})) |
| |
| (defmulti agg-comp-exec-stats |
| "Combines the aggregate stats of one executor with the given map, selecting |
| the appropriate window and including system components as specified." |
| (fn dispatch-fn [_ _ init-val _] (:type init-val))) |
| |
| (defmethod agg-comp-exec-stats :bolt |
| [window include-sys? acc-stats new-data] |
| (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?) |
| :stats (merge-agg-comp-stats-comp-page-bolt |
| (:stats acc-stats) |
| (agg-pre-merge-comp-page-bolt new-data window include-sys?)) |
| :type :bolt)) |
| |
| (defmethod agg-comp-exec-stats :spout |
| [window include-sys? acc-stats new-data] |
| (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) include-sys?) |
| :stats (merge-agg-comp-stats-comp-page-spout |
| (:stats acc-stats) |
| (agg-pre-merge-comp-page-spout new-data window include-sys?)) |
| :type :spout)) |
| |
| (defn- aggregate-comp-stats* |
| [window include-sys? data init-val] |
| (-> (partial agg-comp-exec-stats |
| window |
| include-sys?) |
| (reduce init-val data))) |
| |
| (defmulti aggregate-comp-stats |
| (fn dispatch-fn [& args] (-> args last first :type))) |
| |
| (defmethod aggregate-comp-stats :bolt |
| [& args] |
| (let [init-val {:type :bolt |
| :cid+sid->input-stats {} |
| :sid->output-stats {} |
| :executor-stats [] |
| :window->emitted {} |
| :window->transferred {} |
| :window->exec-lat-wgt-avg {} |
| :window->executed {} |
| :window->proc-lat-wgt-avg {} |
| :window->acked {} |
| :window->failed {}}] |
| (apply aggregate-comp-stats* (concat args (list init-val))))) |
| |
| (defmethod aggregate-comp-stats :spout |
| [& args] |
| (let [init-val {:type :spout |
| :sid->output-stats {} |
| :executor-stats [] |
| :window->emitted {} |
| :window->transferred {} |
| :window->comp-lat-wgt-avg {} |
| :window->acked {} |
| :window->failed {}}] |
| (apply aggregate-comp-stats* (concat args (list init-val))))) |
| |
| (defmethod aggregate-comp-stats :default [& _] {}) |
| |
| (defmulti post-aggregate-comp-stats |
| (fn [_ _ data] (:type data))) |
| |
| (defmethod post-aggregate-comp-stats :bolt |
| [task->component |
| exec->host+port |
| {{i-stats :cid+sid->input-stats |
| o-stats :sid->output-stats |
| num-tasks :num-tasks |
| num-executors :num-executors} :stats |
| comp-type :type :as acc-data}] |
| {:type comp-type |
| :num-tasks num-tasks |
| :num-executors num-executors |
| :cid+sid->input-stats |
| (->> i-stats |
| (map-val (fn [m] |
| (let [executed (:executed m) |
| lats (if (and executed (pos? executed)) |
| {:execute-latency |
| (div (or (:executeLatencyTotal m) 0) |
| executed) |
| :process-latency |
| (div (or (:processLatencyTotal m) 0) |
| executed)} |
| {:execute-latency 0 |
| :process-latency 0})] |
| (-> m (merge lats) (dissoc :executeLatencyTotal |
| :processLatencyTotal)))))) |
| :sid->output-stats o-stats |
| :executor-stats (:executor-stats (:stats acc-data)) |
| :window->emitted (map-key str (:window->emitted acc-data)) |
| :window->transferred (map-key str (:window->transferred acc-data)) |
| :window->execute-latency |
| (compute-weighted-averages-per-window acc-data |
| :window->exec-lat-wgt-avg |
| :window->executed) |
| :window->executed (map-key str (:window->executed acc-data)) |
| :window->process-latency |
| (compute-weighted-averages-per-window acc-data |
| :window->proc-lat-wgt-avg |
| :window->executed) |
| :window->acked (map-key str (:window->acked acc-data)) |
| :window->failed (map-key str (:window->failed acc-data))}) |
| |
| (defmethod post-aggregate-comp-stats :spout |
| [task->component |
| exec->host+port |
| {{o-stats :sid->output-stats |
| num-tasks :num-tasks |
| num-executors :num-executors} :stats |
| comp-type :type :as acc-data}] |
| {:type comp-type |
| :num-tasks num-tasks |
| :num-executors num-executors |
| :sid->output-stats |
| (->> o-stats |
| (map-val (fn [m] |
| (let [acked (:acked m) |
| lat (if (and acked (pos? acked)) |
| {:complete-latency |
| (div (or (:completeLatencyTotal m) 0) acked)} |
| {:complete-latency 0})] |
| (-> m (merge lat) (dissoc :completeLatencyTotal)))))) |
| :executor-stats (:executor-stats (:stats acc-data)) |
| :window->emitted (map-key str (:window->emitted acc-data)) |
| :window->transferred (map-key str (:window->transferred acc-data)) |
| :window->complete-latency |
| (compute-weighted-averages-per-window acc-data |
| :window->comp-lat-wgt-avg |
| :window->acked) |
| :window->acked (map-key str (:window->acked acc-data)) |
| :window->failed (map-key str (:window->failed acc-data))}) |
| |
| (defmethod post-aggregate-comp-stats :default [& _] {}) |
| |
| (defn thriftify-exec-agg-stats |
| [comp-id comp-type {:keys [executor-id host port uptime] :as stats}] |
| (doto (ExecutorAggregateStats.) |
| (.set_exec_summary (ExecutorSummary. (apply #(ExecutorInfo. %1 %2) |
| executor-id) |
| comp-id |
| host |
| port |
| (or uptime 0))) |
| (.set_stats ((condp = comp-type |
| :bolt thriftify-bolt-agg-stats |
| :spout thriftify-spout-agg-stats) stats)))) |
| |
| (defn- thriftify-bolt-input-stats |
| [cid+sid->input-stats] |
| (into {} (for [[cid+sid input-stats] cid+sid->input-stats] |
| [(to-global-stream-id cid+sid) |
| (thriftify-bolt-agg-stats input-stats)]))) |
| |
| (defn- thriftify-bolt-output-stats |
| [sid->output-stats] |
| (map-val thriftify-bolt-agg-stats sid->output-stats)) |
| |
| (defn- thriftify-spout-output-stats |
| [sid->output-stats] |
| (map-val thriftify-spout-agg-stats sid->output-stats)) |
| |
| (defn thriftify-comp-page-data |
| [topo-id topology comp-id data] |
| (let [w->stats (swap-map-order |
| (merge |
| {:emitted (:window->emitted data) |
| :transferred (:window->transferred data) |
| :acked (:window->acked data) |
| :failed (:window->failed data)} |
| (condp = (:type data) |
| :bolt {:execute-latency (:window->execute-latency data) |
| :process-latency (:window->process-latency data) |
| :executed (:window->executed data)} |
| :spout {:complete-latency |
| (:window->complete-latency data)} |
| {}))) ; default |
| [compType exec-stats w->stats gsid->input-stats sid->output-stats] |
| (condp = (component-type topology comp-id) |
| :bolt [ComponentType/BOLT |
| (-> |
| (partial thriftify-exec-agg-stats comp-id :bolt) |
| (map (:executor-stats data))) |
| (map-val thriftify-bolt-agg-stats w->stats) |
| (thriftify-bolt-input-stats (:cid+sid->input-stats data)) |
| (thriftify-bolt-output-stats (:sid->output-stats data))] |
| :spout [ComponentType/SPOUT |
| (-> |
| (partial thriftify-exec-agg-stats comp-id :spout) |
| (map (:executor-stats data))) |
| (map-val thriftify-spout-agg-stats w->stats) |
| nil ;; spouts do not have input stats |
| (thriftify-spout-output-stats (:sid->output-stats data))]), |
| num-executors (:num-executors data) |
| num-tasks (:num-tasks data) |
| ret (doto (ComponentPageInfo. comp-id compType) |
| (.set_topology_id topo-id) |
| (.set_topology_name nil) |
| (.set_window_to_stats w->stats) |
| (.set_sid_to_output_stats sid->output-stats) |
| (.set_exec_stats exec-stats))] |
| (and num-executors (.set_num_executors ret num-executors)) |
| (and num-tasks (.set_num_tasks ret num-tasks)) |
| (and gsid->input-stats |
| (.set_gsid_to_input_stats ret gsid->input-stats)) |
| ret)) |
| |
| (defn agg-comp-execs-stats |
| "Aggregate various executor statistics for a component from the given |
| heartbeats." |
| [exec->host+port |
| task->component |
| beats |
| window |
| include-sys? |
| topology-id |
| topology |
| component-id] |
| (->> ;; This iterates over each executor one time, because of lazy evaluation. |
| (extract-data-from-hb exec->host+port |
| task->component |
| beats |
| include-sys? |
| topology |
| component-id) |
| (aggregate-comp-stats window include-sys?) |
| (post-aggregate-comp-stats task->component exec->host+port) |
| (thriftify-comp-page-data topology-id topology component-id))) |
| |
| (defn expand-averages |
| [avg counts] |
| (let [avg (clojurify-structure avg) |
| counts (clojurify-structure counts)] |
| (into {} |
| (for [[slice streams] counts] |
| [slice |
| (into {} |
| (for [[stream c] streams] |
| [stream |
| [(* c (get-in avg [slice stream])) |
| c]] |
| ))])))) |
| |
| (defn expand-averages-seq |
| [average-seq counts-seq] |
| (->> (map vector average-seq counts-seq) |
| (map #(apply expand-averages %)) |
| (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2))))) |
| |
| (defn- val-avg |
| [[t c]] |
| (if (= t 0) 0 |
| (double (/ t c)))) |
| |
| (defn aggregate-averages |
| [average-seq counts-seq] |
| (->> (expand-averages-seq average-seq counts-seq) |
| (map-val |
| (fn [s] |
| (map-val val-avg s))))) |
| |
| (defn aggregate-avg-streams |
| [avg counts] |
| (let [expanded (expand-averages avg counts)] |
| (->> expanded |
| (map-val #(reduce add-pairs (vals %))) |
| (map-val val-avg)))) |
| |
| (defn pre-process |
| [stream-summary include-sys?] |
| (let [filter-fn (mk-include-sys-fn include-sys?) |
| emitted (:emitted stream-summary) |
| emitted (into {} (for [[window stat] emitted] |
| {window (filter-key filter-fn stat)})) |
| transferred (:transferred stream-summary) |
| transferred (into {} (for [[window stat] transferred] |
| {window (filter-key filter-fn stat)})) |
| stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted)) |
| stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))] |
| stream-summary)) |
| |
| (defn aggregate-counts |
| [counts-seq] |
| (->> counts-seq |
| (map clojurify-structure) |
| (apply merge-with |
| (fn [s1 s2] |
| (merge-with + s1 s2))))) |
| |
| (defn aggregate-common-stats |
| [stats-seq] |
| {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq)) |
| :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))}) |
| |
| (defn aggregate-bolt-stats |
| [stats-seq include-sys?] |
| (let [stats-seq (collectify stats-seq)] |
| (merge (pre-process (aggregate-common-stats stats-seq) include-sys?) |
| {:acked |
| (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_acked) |
| stats-seq)) |
| :failed |
| (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed) |
| stats-seq)) |
| :executed |
| (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed) |
| stats-seq)) |
| :process-latencies |
| (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg) |
| stats-seq) |
| (map #(.. ^ExecutorStats % get_specific get_bolt get_acked) |
| stats-seq)) |
| :execute-latencies |
| (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg) |
| stats-seq) |
| (map #(.. ^ExecutorStats % get_specific get_bolt get_executed) |
| stats-seq))}))) |
| |
| (defn aggregate-spout-stats |
| [stats-seq include-sys?] |
| (let [stats-seq (collectify stats-seq)] |
| (merge (pre-process (aggregate-common-stats stats-seq) include-sys?) |
| {:acked |
| (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_acked) |
| stats-seq)) |
| :failed |
| (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_failed) |
| stats-seq)) |
| :complete-latencies |
| (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_spout get_complete_ms_avg) |
| stats-seq) |
| (map #(.. ^ExecutorStats % get_specific get_spout get_acked) |
| stats-seq))}))) |
| |
| (defn get-filled-stats |
| [summs] |
| (->> summs |
| (map #(.get_stats ^ExecutorSummary %)) |
| (filter not-nil?))) |
| |
| (defn aggregate-spout-streams |
| [stats] |
| {:acked (aggregate-count-streams (:acked stats)) |
| :failed (aggregate-count-streams (:failed stats)) |
| :emitted (aggregate-count-streams (:emitted stats)) |
| :transferred (aggregate-count-streams (:transferred stats)) |
| :complete-latencies (aggregate-avg-streams (:complete-latencies stats) |
| (:acked stats))}) |
| |
| (defn spout-streams-stats |
| [summs include-sys?] |
| (let [stats-seq (get-filled-stats summs)] |
| (aggregate-spout-streams |
| (aggregate-spout-stats |
| stats-seq include-sys?)))) |
| |
| (defn aggregate-bolt-streams |
| [stats] |
| {:acked (aggregate-count-streams (:acked stats)) |
| :failed (aggregate-count-streams (:failed stats)) |
| :emitted (aggregate-count-streams (:emitted stats)) |
| :transferred (aggregate-count-streams (:transferred stats)) |
| :process-latencies (aggregate-avg-streams (:process-latencies stats) |
| (:acked stats)) |
| :executed (aggregate-count-streams (:executed stats)) |
| :execute-latencies (aggregate-avg-streams (:execute-latencies stats) |
| (:executed stats))}) |
| |
| (defn compute-executor-capacity |
| [^ExecutorSummary e] |
| (let [stats (.get_stats e) |
| stats (if stats |
| (-> stats |
| (aggregate-bolt-stats true) |
| (aggregate-bolt-streams) |
| swap-map-order |
| (get (str TEN-MIN-IN-SECONDS)))) |
| uptime (nil-to-zero (.get_uptime_secs e)) |
| window (if (< uptime TEN-MIN-IN-SECONDS) uptime TEN-MIN-IN-SECONDS) |
| executed (-> stats :executed nil-to-zero) |
| latency (-> stats :execute-latencies nil-to-zero)] |
| (if (> window 0) |
| (div (* executed latency) (* 1000 window))))) |
| |
| (defn bolt-streams-stats |
| [summs include-sys?] |
| (let [stats-seq (get-filled-stats summs)] |
| (aggregate-bolt-streams |
| (aggregate-bolt-stats |
| stats-seq include-sys?)))) |
| |
| (defn total-aggregate-stats |
| [spout-summs bolt-summs include-sys?] |
| (let [spout-stats (get-filled-stats spout-summs) |
| bolt-stats (get-filled-stats bolt-summs) |
| agg-spout-stats (-> spout-stats |
| (aggregate-spout-stats include-sys?) |
| aggregate-spout-streams) |
| agg-bolt-stats (-> bolt-stats |
| (aggregate-bolt-stats include-sys?) |
| aggregate-bolt-streams)] |
| (merge-with |
| (fn [s1 s2] |
| (merge-with + s1 s2)) |
| (select-keys |
| agg-bolt-stats |
| ;; Include only keys that will be used. We want to count acked and |
| ;; failed only for the "tuple trees," so we do not include those keys |
| ;; from the bolt executors. |
| [:emitted :transferred]) |
| agg-spout-stats))) |
| |
| (defn error-subset |
| [error-str] |
| (apply str (take 200 error-str))) |
| |
| (defn most-recent-error |
| [errors-list] |
| (let [error (->> errors-list |
| (sort-by #(.get_error_time_secs ^ErrorInfo %)) |
| reverse |
| first)] |
| (if error |
| (error-subset (.get_error ^ErrorInfo error)) |
| ""))) |
| |
| (defn float-str [n] |
| (if n |
| (format "%.3f" (float n)) |
| "0")) |
| |
| (defn compute-bolt-capacity |
| [executors] |
| (->> executors |
| (map compute-executor-capacity) |
| (map nil-to-zero) |
| (apply max))) |