Merge pull request #517 from jasonjckn/system-bolt-3
Added SystemBolt.
diff --git a/bin/build_release.sh b/bin/build_release.sh
index fdec129..089065f 100644
--- a/bin/build_release.sh
+++ b/bin/build_release.sh
@@ -14,11 +14,11 @@
rm -rf _release
rm -f *.zip
-$LEIN with-profile release clean
-$LEIN with-profile release deps
-$LEIN with-profile release jar
-$LEIN with-profile release pom
-mvn dependency:copy-dependencies
+$LEIN with-profile release clean || exit 1
+$LEIN with-profile release deps || exit 1
+$LEIN with-profile release jar || exit 1
+$LEIN with-profile release pom || exit 1
+mvn dependency:copy-dependencies || exit 1
mkdir -p $DIR/lib
cp target/storm-*.jar $DIR/storm-${RELEASE}.jar
diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj
index da93fdd..d1456ea 100644
--- a/src/clj/backtype/storm/daemon/common.clj
+++ b/src/clj/backtype/storm/daemon/common.clj
@@ -5,7 +5,7 @@
(:import [backtype.storm.utils Utils])
(:import [backtype.storm.task WorkerTopologyContext])
(:import [backtype.storm Constants])
- (:import [backtype.storm.spout NoOpSpout])
+ (:import [backtype.storm.metric SystemBolt])
(:require [clojure.set :as set])
(:require [backtype.storm.daemon.acker :as acker])
(:require [backtype.storm.thrift :as thrift])
@@ -241,8 +241,9 @@
(number-duplicates)
(map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
-(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
- (let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
+(defn metrics-consumer-bolt-specs [storm-conf topology]
+ (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
+ inputs (->> (for [comp-id component-ids-that-emit-metrics]
{[comp-id METRICS-STREAM-ID] :shuffle})
(into {}))
@@ -261,27 +262,28 @@
(metrics-consumer-register-ids storm-conf)
(get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
-(defn add-metric-components! [storm-conf ^StormTopology topology]
- (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
+(defn add-metric-components! [storm-conf ^StormTopology topology]
+ (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
(.put_to_bolts topology comp-id bolt-spec)))
-(defn add-system-components! [^StormTopology topology]
- (let [system-spout (thrift/mk-spout-spec*
- (NoOpSpout.)
- {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
- METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
- :p 0
- :conf {TOPOLOGY-TASKS 0})]
- (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
+(defn add-system-components! [conf ^StormTopology topology]
+ (let [system-bolt-spec (thrift/mk-bolt-spec*
+ {}
+ (SystemBolt.)
+ {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
+ METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
+ :p 0
+ :conf {TOPOLOGY-TASKS 0})]
+ (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
- (add-metric-components! storm-conf ret)
- (add-metric-streams! ret)
+ (add-metric-components! storm-conf ret)
+ (add-system-components! storm-conf ret)
+ (add-metric-streams! ret)
(add-system-streams! ret)
- (add-system-components! ret)
(validate-structure! ret)
ret
))
diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj
index 2e176bc..a816237 100644
--- a/src/clj/backtype/storm/daemon/executor.clj
+++ b/src/clj/backtype/storm/daemon/executor.clj
@@ -111,8 +111,7 @@
(defn executor-type [^WorkerTopologyContext context component-id]
(let [topology (.getRawTopology context)
spouts (.get_spouts topology)
- bolts (.get_bolts topology)
- ]
+ bolts (.get_bolts topology)]
(cond (contains? spouts component-id) :spout
(contains? bolts component-id) :bolt
:else (throw-runtime "Could not find " component-id " in topology " topology))))
@@ -182,7 +181,7 @@
(this task tuple nil)
)))
-(defn executor-data [worker executor-id]
+(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
task-ids (executor-id->tasks executor-id)
component-id (.getComponentId worker-context (first task-ids))
@@ -253,7 +252,7 @@
(fn []
(disruptor/publish
receive-queue
- [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
+ [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
(let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
@@ -293,11 +292,11 @@
(fn []
(disruptor/publish
receive-queue
- [[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]]
+ [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
)))))))
(defn mk-executor [worker executor-id]
- (let [executor-data (executor-data worker executor-id)
+ (let [executor-data (mk-executor-data worker executor-id)
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
task-datas (->> executor-data
:task-ids
diff --git a/src/clj/backtype/storm/daemon/supervisor.clj b/src/clj/backtype/storm/daemon/supervisor.clj
index c27a276..fda038f 100644
--- a/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/src/clj/backtype/storm/daemon/supervisor.clj
@@ -85,8 +85,8 @@
(let [local-assignment (assigned-executors (:port worker-heartbeat))]
(and local-assignment
(= (:storm-id worker-heartbeat) (:storm-id local-assignment))
- (= (set (:executors worker-heartbeat)) (set (:executors local-assignment))))
- ))
+ (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
+ (set (:executors local-assignment))))))
(defn read-allocated-workers
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
diff --git a/src/clj/backtype/storm/daemon/worker.clj b/src/clj/backtype/storm/daemon/worker.clj
index f614fa4..157fb32 100644
--- a/src/clj/backtype/storm/daemon/worker.clj
+++ b/src/clj/backtype/storm/daemon/worker.clj
@@ -9,15 +9,16 @@
(defmulti mk-suicide-fn cluster-mode)
-(defn read-worker-executors [storm-cluster-state storm-id assignment-id port]
+(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port]
(let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
(doall
+ (concat
+ [Constants/SYSTEM_EXECUTOR_ID]
(mapcat (fn [[executor loc]]
- (if (= loc [assignment-id port])
- [executor]
- ))
- assignment))
- ))
+ (if (= loc [assignment-id port])
+ [executor]
+ ))
+ assignment)))))
(defnk do-executor-heartbeats [worker :executors nil]
;; stats is how we know what executors are assigned to this worker
@@ -144,7 +145,7 @@
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
storm-conf (read-supervisor-storm-conf conf storm-id)
- executors (set (read-worker-executors storm-cluster-state storm-id assignment-id port))
+ executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
diff --git a/src/jvm/backtype/storm/Constants.java b/src/jvm/backtype/storm/Constants.java
index 7052789..a8ade3c 100644
--- a/src/jvm/backtype/storm/Constants.java
+++ b/src/jvm/backtype/storm/Constants.java
@@ -1,11 +1,14 @@
package backtype.storm;
import backtype.storm.coordination.CoordinatedBolt;
+import clojure.lang.RT;
public class Constants {
public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
+ public static final long SYSTEM_TASK_ID = -1;
+ public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
public static final String SYSTEM_COMPONENT_ID = "__system";
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
diff --git a/src/jvm/backtype/storm/metric/SystemBolt.java b/src/jvm/backtype/storm/metric/SystemBolt.java
new file mode 100644
index 0000000..da8bd2b
--- /dev/null
+++ b/src/jvm/backtype/storm/metric/SystemBolt.java
@@ -0,0 +1,139 @@
+package backtype.storm.metric;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.AssignableMetric;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.task.IBolt;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import clojure.lang.AFn;
+import clojure.lang.IFn;
+import clojure.lang.RT;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.*;
+import java.util.List;
+import java.util.Map;
+
+
+// There is one task inside one executor for each worker of the topology.
+// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
+// This bolt was conceived to export worker stats via metrics api.
+public class SystemBolt implements IBolt {
+ private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
+ private static boolean _prepareWasCalled = false;
+
+ private static class MemoryUsageMetric implements IMetric {
+ IFn _getUsage;
+ public MemoryUsageMetric(IFn getUsage) {
+ _getUsage = getUsage;
+ }
+ @Override
+ public Object getValueAndReset() {
+ MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
+ return ImmutableMap.builder()
+ .put("maxBytes", memUsage.getMax())
+ .put("committedBytes", memUsage.getCommitted())
+ .put("initBytes", memUsage.getInit())
+ .put("usedBytes", memUsage.getUsed())
+ .put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed())
+ .put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed())
+ .build();
+ }
+ }
+
+ // canonically the metrics data exported is time bucketed when doing counts.
+ // convert the absolute values here into time buckets.
+ private static class GarbageCollectorMetric implements IMetric {
+ GarbageCollectorMXBean _gcBean;
+ Long _collectionCount;
+ Long _collectionTime;
+ public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
+ _gcBean = gcBean;
+ }
+ @Override
+ public Object getValueAndReset() {
+ Long collectionCountP = _gcBean.getCollectionCount();
+ Long collectionTimeP = _gcBean.getCollectionCount();
+
+ Map ret = null;
+ if(_collectionCount!=null && _collectionTime!=null) {
+ ret = ImmutableMap.builder()
+ .put("count", collectionCountP - _collectionCount)
+ .put("timeMs", collectionTimeP - _collectionTime)
+ .build();
+ }
+
+ _collectionCount = collectionCountP;
+ _collectionTime = collectionTimeP;
+ return ret;
+ }
+ }
+
+ @Override
+ public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) {
+ if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") {
+ throw new RuntimeException("A single worker should have 1 SystemBolt instance.");
+ }
+ _prepareWasCalled = true;
+
+ int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+
+ final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();
+
+ context.registerMetric("uptimeSecs", new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ return jvmRT.getUptime()/1000.0;
+ }
+ }, bucketSize);
+
+ context.registerMetric("startTimeSecs", new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ return jvmRT.getStartTime()/1000.0;
+ }
+ }, bucketSize);
+
+ context.registerMetric("newWorkerEvent", new IMetric() {
+ boolean doEvent = true;
+
+ @Override
+ public Object getValueAndReset() {
+ if (doEvent) {
+ doEvent = false;
+ return 1;
+ } else return 0;
+ }
+ }, bucketSize);
+
+ final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
+
+ context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
+ public Object invoke() {
+ return jvmMemRT.getHeapMemoryUsage();
+ }
+ }), bucketSize);
+ context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() {
+ public Object invoke() {
+ return jvmMemRT.getNonHeapMemoryUsage();
+ }
+ }), bucketSize);
+
+ for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
+ context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ throw new RuntimeException("Non-system tuples should never be sent to __system bolt.");
+ }
+
+ @Override
+ public void cleanup() {
+ }
+}
diff --git a/src/jvm/backtype/storm/spout/NoOpSpout.java b/src/jvm/backtype/storm/spout/NoOpSpout.java
deleted file mode 100644
index 03586dc..0000000
--- a/src/jvm/backtype/storm/spout/NoOpSpout.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-
-
-public class NoOpSpout implements ISpout {
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void activate() {
- }
-
- @Override
- public void deactivate() {
- }
-
- @Override
- public void nextTuple() {
- }
-
- @Override
- public void ack(Object msgId) {
- }
-
- @Override
- public void fail(Object msgId) {
- }
-
-}
diff --git a/src/jvm/backtype/storm/task/GeneralTopologyContext.java b/src/jvm/backtype/storm/task/GeneralTopologyContext.java
index e9e638e..3065b23 100644
--- a/src/jvm/backtype/storm/task/GeneralTopologyContext.java
+++ b/src/jvm/backtype/storm/task/GeneralTopologyContext.java
@@ -63,7 +63,7 @@
* @return the component id for the input task id
*/
public String getComponentId(int taskId) {
- if(taskId==-1) {
+ if(taskId==Constants.SYSTEM_TASK_ID) {
return Constants.SYSTEM_COMPONENT_ID;
} else {
return _taskToComponent.get(taskId);
diff --git a/test/clj/backtype/storm/metrics_test.clj b/test/clj/backtype/storm/metrics_test.clj
index 30ba9a7..62d7af2 100644
--- a/test/clj/backtype/storm/metrics_test.clj
+++ b/test/clj/backtype/storm/metrics_test.clj
@@ -75,6 +75,9 @@
(wait-for-atleast-N-buckets! N# ~comp-id ~metric-name)
(is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
+(defmacro assert-metric-data-exists! [comp-id metric-name]
+ `(is (not-empty (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name))))
+
(deftest test-custom-metric
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
@@ -103,7 +106,7 @@
(deftest test-builtin-metrics-1
(with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
TOPOLOGY-STATS-SAMPLE-RATE 1.0
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
@@ -142,7 +145,6 @@
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
- TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
TOPOLOGY-STATS-SAMPLE-RATE 1.0
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}]
(let [feeder (feeder-spout ["field1"])
@@ -153,7 +155,7 @@
{"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})]
(submit-local-topology (:nimbus cluster)
"metrics-tester"
- {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20}
+ {}
topology)
(.feed feeder ["a"] 1)
@@ -175,11 +177,80 @@
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0])
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1])
- (advance-cluster-time cluster 30)
- (assert-failed tracker 2)
- (assert-buckets! "myspout" "__fail-count/default" [1])
+ (advance-cluster-time cluster 15)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0])
(assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0]))))
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])
+
+ (.feed feeder ["c"] 3)
+ (advance-cluster-time cluster 15)
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0])
+ (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0])
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0]))))
+
+(deftest test-builtin-metrics-3
+ (with-simulated-time-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ TOPOLOGY-STATS-SAMPLE-RATE 1.0
+ TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5
+ TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"myspout" (thrift/mk-spout-spec feeder)}
+ {"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})]
+ (submit-local-topology (:nimbus cluster)
+ "timeout-tester"
+ {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+ topology)
+ (.feed feeder ["a"] 1)
+ (.feed feeder ["b"] 2)
+ (.feed feeder ["c"] 3)
+ (advance-cluster-time cluster 9)
+ (assert-acked tracker 1 3)
+ (assert-buckets! "myspout" "__ack-count/default" [2])
+ (assert-buckets! "myspout" "__emit-count/default" [3])
+ (assert-buckets! "myspout" "__transfer-count/default" [3])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [2])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [3])
+
+ (is (not (.isFailed tracker 2)))
+ (advance-cluster-time cluster 30)
+ (assert-failed tracker 2)
+ (assert-buckets! "myspout" "__fail-count/default" [1])
+ (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0])
+ (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0])
+ (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0]))))
+
+(deftest test-system-bolt
+ (with-simulated-time-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
+ (let [feeder (feeder-spout ["field1"])
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {})]
+ (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 70)
+ (assert-buckets! "__system" "newWorkerEvent" [1])
+ (assert-buckets! "__system" "configTopologyWorkers" [1])
+ (assert-metric-data-exists! "__system" "uptimeSecs")
+ (assert-metric-data-exists! "__system" "startTimeSecs")
+ (assert-metric-data-exists! "__system" "topologyPartialUptimeSecs")
+
+ (advance-cluster-time cluster 180)
+ (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0])
+ (assert-buckets! "__system" "configTopologyWorkers" [1 1 1 1]))))
+
+