Merge pull request #517 from jasonjckn/system-bolt-3

Added SystemBolt.
diff --git a/bin/build_release.sh b/bin/build_release.sh
index fdec129..089065f 100644
--- a/bin/build_release.sh
+++ b/bin/build_release.sh
@@ -14,11 +14,11 @@
 
 rm -rf _release
 rm -f *.zip 
-$LEIN with-profile release clean
-$LEIN with-profile release deps
-$LEIN with-profile release jar
-$LEIN with-profile release pom
-mvn dependency:copy-dependencies
+$LEIN with-profile release clean || exit 1
+$LEIN with-profile release deps || exit 1
+$LEIN with-profile release jar || exit 1
+$LEIN with-profile release pom || exit 1
+mvn dependency:copy-dependencies || exit 1
 
 mkdir -p $DIR/lib
 cp target/storm-*.jar $DIR/storm-${RELEASE}.jar
diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj
index da93fdd..d1456ea 100644
--- a/src/clj/backtype/storm/daemon/common.clj
+++ b/src/clj/backtype/storm/daemon/common.clj
@@ -5,7 +5,7 @@
   (:import [backtype.storm.utils Utils])
   (:import [backtype.storm.task WorkerTopologyContext])
   (:import [backtype.storm Constants])
-  (:import [backtype.storm.spout NoOpSpout])
+  (:import [backtype.storm.metric SystemBolt])
   (:require [clojure.set :as set])  
   (:require [backtype.storm.daemon.acker :as acker])
   (:require [backtype.storm.thrift :as thrift])
@@ -241,8 +241,9 @@
        (number-duplicates)
        (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
 
-(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
-  (let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
+(defn metrics-consumer-bolt-specs [storm-conf topology]
+  (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
+        inputs (->> (for [comp-id component-ids-that-emit-metrics]
                       {[comp-id METRICS-STREAM-ID] :shuffle})
                     (into {}))
         
@@ -261,27 +262,28 @@
      (metrics-consumer-register-ids storm-conf)
      (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
 
-(defn add-metric-components! [storm-conf ^StormTopology topology]
-  (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
+(defn add-metric-components! [storm-conf ^StormTopology topology]  
+  (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
     (.put_to_bolts topology comp-id bolt-spec)))
 
-(defn add-system-components! [^StormTopology topology]
-  (let [system-spout (thrift/mk-spout-spec*
-                      (NoOpSpout.)
-                      {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
-                       METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
-                      :p 0
-                      :conf {TOPOLOGY-TASKS 0})]
-    (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
+(defn add-system-components! [conf ^StormTopology topology]
+  (let [system-bolt-spec (thrift/mk-bolt-spec*
+                          {}
+                          (SystemBolt.)
+                          {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
+                           METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}                          
+                          :p 0
+                          :conf {TOPOLOGY-TASKS 0})]
+    (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
 
 (defn system-topology! [storm-conf ^StormTopology topology]
   (validate-basic! topology)
   (let [ret (.deepCopy topology)]
     (add-acker! storm-conf ret)
-    (add-metric-components! storm-conf ret)
-    (add-metric-streams! ret)    
+    (add-metric-components! storm-conf ret)    
+    (add-system-components! storm-conf ret)
+    (add-metric-streams! ret)
     (add-system-streams! ret)
-    (add-system-components! ret)
     (validate-structure! ret)
     ret
     ))
diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj
index 2e176bc..a816237 100644
--- a/src/clj/backtype/storm/daemon/executor.clj
+++ b/src/clj/backtype/storm/daemon/executor.clj
@@ -111,8 +111,7 @@
 (defn executor-type [^WorkerTopologyContext context component-id]
   (let [topology (.getRawTopology context)
         spouts (.get_spouts topology)
-        bolts (.get_bolts topology)
-        ]
+        bolts (.get_bolts topology)]
     (cond (contains? spouts component-id) :spout
           (contains? bolts component-id) :bolt
           :else (throw-runtime "Could not find " component-id " in topology " topology))))
@@ -182,7 +181,7 @@
       (this task tuple nil)
       )))
 
-(defn executor-data [worker executor-id]
+(defn mk-executor-data [worker executor-id]
   (let [worker-context (worker-context worker)
         task-ids (executor-id->tasks executor-id)
         component-id (.getComponentId worker-context (first task-ids))
@@ -253,7 +252,7 @@
        (fn []
          (disruptor/publish
           receive-queue
-          [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
+          [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
 
 (defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
   (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
@@ -293,11 +292,11 @@
           (fn []
             (disruptor/publish
               receive-queue
-              [[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]]
+              [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
               )))))))
 
 (defn mk-executor [worker executor-id]
-  (let [executor-data (executor-data worker executor-id)
+  (let [executor-data (mk-executor-data worker executor-id)
         _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
         task-datas (->> executor-data
                         :task-ids
diff --git a/src/clj/backtype/storm/daemon/supervisor.clj b/src/clj/backtype/storm/daemon/supervisor.clj
index c27a276..fda038f 100644
--- a/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/src/clj/backtype/storm/daemon/supervisor.clj
@@ -85,8 +85,8 @@
   (let [local-assignment (assigned-executors (:port worker-heartbeat))]
     (and local-assignment
          (= (:storm-id worker-heartbeat) (:storm-id local-assignment))
-         (= (set (:executors worker-heartbeat)) (set (:executors local-assignment))))
-    ))
+         (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
+            (set (:executors local-assignment))))))
 
 (defn read-allocated-workers
   "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
diff --git a/src/clj/backtype/storm/daemon/worker.clj b/src/clj/backtype/storm/daemon/worker.clj
index f614fa4..157fb32 100644
--- a/src/clj/backtype/storm/daemon/worker.clj
+++ b/src/clj/backtype/storm/daemon/worker.clj
@@ -9,15 +9,16 @@
 
 (defmulti mk-suicide-fn cluster-mode)
 
-(defn read-worker-executors [storm-cluster-state storm-id assignment-id port]
+(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port]
   (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
     (doall
+     (concat     
+      [Constants/SYSTEM_EXECUTOR_ID]
       (mapcat (fn [[executor loc]]
-              (if (= loc [assignment-id port])
-                [executor]
-                ))
-            assignment))
-    ))
+                (if (= loc [assignment-id port])
+                  [executor]
+                  ))
+              assignment)))))
 
 (defnk do-executor-heartbeats [worker :executors nil]
   ;; stats is how we know what executors are assigned to this worker 
@@ -144,7 +145,7 @@
   (let [cluster-state (cluster/mk-distributed-cluster-state conf)
         storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
         storm-conf (read-supervisor-storm-conf conf storm-id)
-        executors (set (read-worker-executors storm-cluster-state storm-id assignment-id port))
+        executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
         transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
diff --git a/src/jvm/backtype/storm/Constants.java b/src/jvm/backtype/storm/Constants.java
index 7052789..a8ade3c 100644
--- a/src/jvm/backtype/storm/Constants.java
+++ b/src/jvm/backtype/storm/Constants.java
@@ -1,11 +1,14 @@
 package backtype.storm;
 
 import backtype.storm.coordination.CoordinatedBolt;
+import clojure.lang.RT;
 
 
 public class Constants {
     public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; 
 
+    public static final long SYSTEM_TASK_ID = -1;
+    public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
     public static final String SYSTEM_COMPONENT_ID = "__system";
     public static final String SYSTEM_TICK_STREAM_ID = "__tick";
     public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
diff --git a/src/jvm/backtype/storm/metric/SystemBolt.java b/src/jvm/backtype/storm/metric/SystemBolt.java
new file mode 100644
index 0000000..da8bd2b
--- /dev/null
+++ b/src/jvm/backtype/storm/metric/SystemBolt.java
@@ -0,0 +1,139 @@
+package backtype.storm.metric;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.AssignableMetric;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.task.IBolt;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import clojure.lang.AFn;
+import clojure.lang.IFn;
+import clojure.lang.RT;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.*;
+import java.util.List;
+import java.util.Map;
+
+
+// There is one task inside one executor for each worker of the topology.
+// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
+// This bolt was conceived to export worker stats via metrics api.
+public class SystemBolt implements IBolt {
+    private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
+    private static boolean _prepareWasCalled = false;
+
+    private static class MemoryUsageMetric implements IMetric {
+        IFn _getUsage;
+        public MemoryUsageMetric(IFn getUsage) {
+            _getUsage = getUsage;
+        }
+        @Override
+        public Object getValueAndReset() {
+            MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
+            return ImmutableMap.builder()
+                    .put("maxBytes", memUsage.getMax())
+                    .put("committedBytes", memUsage.getCommitted())
+                    .put("initBytes", memUsage.getInit())
+                    .put("usedBytes", memUsage.getUsed())
+                    .put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed())
+                    .put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed())
+                    .build();
+        }
+    }
+
+    // canonically the metrics data exported is time bucketed when doing counts.
+    // convert the absolute values here into time buckets.
+    private static class GarbageCollectorMetric implements IMetric {
+        GarbageCollectorMXBean _gcBean;
+        Long _collectionCount;
+        Long _collectionTime;
+        public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
+            _gcBean = gcBean;
+        }
+        @Override
+        public Object getValueAndReset() {
+            Long collectionCountP = _gcBean.getCollectionCount();
+            Long collectionTimeP = _gcBean.getCollectionCount();
+
+            Map ret = null;
+            if(_collectionCount!=null && _collectionTime!=null) {
+                ret = ImmutableMap.builder()
+                        .put("count", collectionCountP - _collectionCount)
+                        .put("timeMs", collectionTimeP - _collectionTime)
+                        .build();
+            }
+
+            _collectionCount = collectionCountP;
+            _collectionTime = collectionTimeP;
+            return ret;
+        }
+    }
+
+    @Override
+    public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) {
+        if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") {
+            throw new RuntimeException("A single worker should have 1 SystemBolt instance.");
+        }
+        _prepareWasCalled = true;
+
+        int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+
+        final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();
+
+        context.registerMetric("uptimeSecs", new IMetric() {
+            @Override
+            public Object getValueAndReset() {
+                return jvmRT.getUptime()/1000.0;
+            }
+        }, bucketSize);
+
+        context.registerMetric("startTimeSecs", new IMetric() {
+            @Override
+            public Object getValueAndReset() {
+                return jvmRT.getStartTime()/1000.0;
+            }
+        }, bucketSize);
+
+        context.registerMetric("newWorkerEvent", new IMetric() {
+            boolean doEvent = true;
+
+            @Override
+            public Object getValueAndReset() {
+                if (doEvent) {
+                    doEvent = false;
+                    return 1;
+                } else return 0;
+            }
+        }, bucketSize);
+
+        final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
+
+        context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
+            public Object invoke() {
+                return jvmMemRT.getHeapMemoryUsage();
+            }
+        }), bucketSize);
+        context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() {
+            public Object invoke() {
+                return jvmMemRT.getNonHeapMemoryUsage();
+            }
+        }), bucketSize);
+
+        for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
+            context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        throw new RuntimeException("Non-system tuples should never be sent to __system bolt.");
+    }
+
+    @Override
+    public void cleanup() {
+    }
+}
diff --git a/src/jvm/backtype/storm/spout/NoOpSpout.java b/src/jvm/backtype/storm/spout/NoOpSpout.java
deleted file mode 100644
index 03586dc..0000000
--- a/src/jvm/backtype/storm/spout/NoOpSpout.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-
-
-public class NoOpSpout implements ISpout {
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public void activate() {
-    }
-
-    @Override
-    public void deactivate() {
-    }
-
-    @Override
-    public void nextTuple() {
-    }
-
-    @Override
-    public void ack(Object msgId) {
-    }
-
-    @Override
-    public void fail(Object msgId) {
-    }
-    
-}
diff --git a/src/jvm/backtype/storm/task/GeneralTopologyContext.java b/src/jvm/backtype/storm/task/GeneralTopologyContext.java
index e9e638e..3065b23 100644
--- a/src/jvm/backtype/storm/task/GeneralTopologyContext.java
+++ b/src/jvm/backtype/storm/task/GeneralTopologyContext.java
@@ -63,7 +63,7 @@
      * @return the component id for the input task id
      */
     public String getComponentId(int taskId) {
-        if(taskId==-1) {
+        if(taskId==Constants.SYSTEM_TASK_ID) {
             return Constants.SYSTEM_COMPONENT_ID;
         } else {
             return _taskToComponent.get(taskId);
diff --git a/test/clj/backtype/storm/metrics_test.clj b/test/clj/backtype/storm/metrics_test.clj
index 30ba9a7..62d7af2 100644
--- a/test/clj/backtype/storm/metrics_test.clj
+++ b/test/clj/backtype/storm/metrics_test.clj
@@ -75,6 +75,9 @@
        (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name)
        (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
 
+(defmacro assert-metric-data-exists! [comp-id metric-name]
+  `(is (not-empty (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name))))
+
 (deftest test-custom-metric
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
@@ -103,7 +106,7 @@
 
 (deftest test-builtin-metrics-1
   (with-simulated-time-local-cluster
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER                    
                            [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
@@ -142,7 +145,6 @@
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                            [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
-                           TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}]
     (let [feeder (feeder-spout ["field1"])
@@ -153,7 +155,7 @@
                     {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})]      
       (submit-local-topology (:nimbus cluster)
                              "metrics-tester"
-                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20}
+                             {}
                              topology)
       
       (.feed feeder ["a"] 1)
@@ -175,11 +177,80 @@
       (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0])
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1])
 
-      (advance-cluster-time cluster 30)
-      (assert-failed tracker 2)   
-      (assert-buckets! "myspout" "__fail-count/default" [1])
+      (advance-cluster-time cluster 15)      
       (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0])
       (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])                  
+      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])
       (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0]))))
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])
+      
+      (.feed feeder ["c"] 3)            
+      (advance-cluster-time cluster 15)      
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0])
+      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0])
+      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0])
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0])
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0]))))
+
+(deftest test-builtin-metrics-3
+  (with-simulated-time-local-cluster
+    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           TOPOLOGY-STATS-SAMPLE-RATE 1.0
+                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5
+                           TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+    (let [feeder (feeder-spout ["field1"])
+          tracker (AckFailMapTracker.)
+          _ (.setAckFailDelegate feeder tracker)
+          topology (thrift/mk-topology
+                    {"myspout" (thrift/mk-spout-spec feeder)}
+                    {"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})]      
+      (submit-local-topology (:nimbus cluster)
+                             "timeout-tester"
+                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+                             topology)
+      (.feed feeder ["a"] 1)
+      (.feed feeder ["b"] 2)
+      (.feed feeder ["c"] 3)
+      (advance-cluster-time cluster 9)
+      (assert-acked tracker 1 3)
+      (assert-buckets! "myspout" "__ack-count/default" [2])
+      (assert-buckets! "myspout" "__emit-count/default" [3])
+      (assert-buckets! "myspout" "__transfer-count/default" [3])
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2])
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3])
+      
+      (is (not (.isFailed tracker 2)))
+      (advance-cluster-time cluster 30)
+      (assert-failed tracker 2)
+      (assert-buckets! "myspout" "__fail-count/default" [1])
+      (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0])
+      (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0])
+      (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0])
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0])
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0]))))
+
+(deftest test-system-bolt
+  (with-simulated-time-local-cluster
+    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
+    (let [feeder (feeder-spout ["field1"])
+          topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec feeder)}
+                    {})]      
+      (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+
+      (.feed feeder ["a"] 1)
+      (advance-cluster-time cluster 70)
+      (assert-buckets! "__system" "newWorkerEvent" [1])
+      (assert-buckets! "__system" "configTopologyWorkers" [1])
+      (assert-metric-data-exists! "__system" "uptimeSecs")
+      (assert-metric-data-exists! "__system" "startTimeSecs")
+      (assert-metric-data-exists! "__system" "topologyPartialUptimeSecs")
+
+      (advance-cluster-time cluster 180)
+      (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0])
+      (assert-buckets! "__system" "configTopologyWorkers" [1 1 1 1]))))
+
+