Add some metrics in the scheduler.
diff --git a/mysos/scheduler/http.py b/mysos/scheduler/http.py
index c38ae1c..afe74a5 100644
--- a/mysos/scheduler/http.py
+++ b/mysos/scheduler/http.py
@@ -10,7 +10,7 @@
 
 
 class MysosServer(HttpServer):
-  def __init__(self, scheduler, asset_dir):
+  def __init__(self, scheduler, asset_dir, metric_sampler):
     super(MysosServer, self).__init__()
     self._scheduler = scheduler
     self._asset_dir = asset_dir
@@ -20,6 +20,8 @@
 
     self._clusters_template = Template(filename=os.path.join(self._template_dir, 'clusters.html'))
 
+    self._metric_sampler = metric_sampler
+
   @route('/clusters/<clustername>', method=['POST'])
   def create(self, clustername):
     """Create a db cluster."""
@@ -72,3 +74,7 @@
   @route('/static/<filepath:path>', method=['GET'])
   def serve_static(self, filepath):
     return static_file(filepath, root=self._static_dir)
+
+  @route("/vars.json")
+  def serve_vars_json(self, var=None, value=None):
+    return self._metric_sampler.sample()
diff --git a/mysos/scheduler/launcher.py b/mysos/scheduler/launcher.py
index 6167546..da245a8 100644
--- a/mysos/scheduler/launcher.py
+++ b/mysos/scheduler/launcher.py
@@ -132,9 +132,15 @@
   @property
   def cluster_info(self):
     with self._lock:
-      ClusterInfo = namedtuple('ClusterInfo', ('name, user, num_nodes'))
+      ClusterInfo = namedtuple('ClusterInfo', [
+          'name', 'user', 'num_nodes', 'total_cpus', 'total_mem_mb', 'total_disk_mb'])
       return ClusterInfo(
-          name=self._cluster.name, user=self._cluster.user, num_nodes=self._cluster.num_nodes)
+          name=self._cluster.name,
+          user=self._cluster.user,
+          num_nodes=self._cluster.num_nodes,
+          total_cpus=self._cluster.cpus * self._cluster.num_nodes,
+          total_mem_mb=self._cluster.mem.as_(Data.MB) * self._cluster.num_nodes,
+          total_disk_mb=self._cluster.disk.as_(Data.MB) * self._cluster.num_nodes)
 
   def launch(self, offer):
     """
diff --git a/mysos/scheduler/mysos_scheduler.py b/mysos/scheduler/mysos_scheduler.py
index 7ff6a25..d1d6c1a 100644
--- a/mysos/scheduler/mysos_scheduler.py
+++ b/mysos/scheduler/mysos_scheduler.py
@@ -17,6 +17,7 @@
 from twitter.common.exceptions import ExceptionalThread
 from twitter.common.http import HttpServer
 from twitter.common.log.options import LogOptions
+from twitter.common.metrics import MetricSampler, RootMetrics
 from twitter.common.quantity import Time
 from twitter.common.quantity.parse_simple import InvalidTime, parse_time
 import yaml
@@ -275,6 +276,8 @@
         executor_environ=options.executor_environ,
         framework_role=options.framework_role)
 
+    RootMetrics().register_observable('scheduler', scheduler)
+
     if fw_principal and fw_secret:
       cred = Credential(principal=fw_principal, secret=fw_secret)
       scheduler_driver = mesos.native.MesosSchedulerDriver(
@@ -290,8 +293,11 @@
 
     scheduler_driver.start()
 
+    metric_sampler = MetricSampler(RootMetrics())
+    metric_sampler.start()
+
     server = HttpServer()
-    server.mount_routes(MysosServer(scheduler, web_assets_dir))
+    server.mount_routes(MysosServer(scheduler, web_assets_dir, metric_sampler))
 
     et = ExceptionalThread(
         target=server.run, args=('0.0.0.0', options.api_port, 'cherrypy'))
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index 8c1e36d..9add38a 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -17,6 +17,7 @@
 import mesos.interface.mesos_pb2 as mesos_pb2
 from twitter.common import log
 from twitter.common.collections.orderedset import OrderedSet
+from twitter.common.metrics import AtomicGauge, MutatorGauge, Observable
 from twitter.common.quantity import Amount, Data, Time
 from twitter.common.quantity.parse_simple import InvalidData, parse_data
 
@@ -31,7 +32,7 @@
 INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION = Amount(sys.maxint / 2, Time.NANOSECONDS)
 
 
-class MysosScheduler(mesos.interface.Scheduler):
+class MysosScheduler(mesos.interface.Scheduler, Observable):
 
   class Error(Exception): pass
 
@@ -112,6 +113,17 @@
     self.connected = threading.Event()  # An event set when the scheduler is first connected to
                                         # Mesos. The scheduler tolerates later disconnections.
 
+    self._cluster_count = self.metrics.register(AtomicGauge('cluster_count', 0))
+
+    # Total resources requested by the scheduler's clients. When a cluster is created its resources
+    # are added to the total; when it's deleted its resources are subtracted from the total.
+    # NOTE: These are 'requested' resources that are independent of resources offered by Mesos or
+    # allocated to or used by Mysos tasks running on Mesos cluster.
+    self._total_requested_cpus = self.metrics.register(MutatorGauge('total_requested_cpus', 0.))
+    self._total_requested_mem_mb = self.metrics.register(MutatorGauge('total_requested_mem_mb', 0.))
+    self._total_requested_disk_mb = self.metrics.register(
+        MutatorGauge('total_requested_disk_mb', 0.))
+
   # --- Public interface. ---
   def create_cluster(self, cluster_name, cluster_user, num_nodes, size=None, backup_id=None):
     """
@@ -146,12 +158,20 @@
       if not cluster_user:
         raise self.InvalidUser('Invalid user name: %s' % cluster_user)
 
-      if int(num_nodes) <= 0:
+      num_nodes = int(num_nodes)
+      if num_nodes <= 0:
         raise ValueError("Invalid number of cluster nodes: %s" % num_nodes)
 
       resources = parse_size(size)
       log.info("Requested resources per instance for cluster %s: %s" % (resources, cluster_name))
 
+      self._total_requested_cpus.write(
+          self._total_requested_cpus.read() + resources['cpus'] * num_nodes)
+      self._total_requested_mem_mb.write(
+          self._total_requested_mem_mb.read() + resources['mem'].as_(Data.MB) * num_nodes)
+      self._total_requested_disk_mb.write(
+          self._total_requested_disk_mb.read() + resources['disk'].as_(Data.MB) * num_nodes)
+
       self._state.clusters.add(cluster_name)
       self._state_provider.dump_scheduler_state(self._state)
 
@@ -161,7 +181,7 @@
           cluster_name,
           cluster_user,
           self._password_box.encrypt(password),
-          int(num_nodes),
+          num_nodes,
           cpus=resources['cpus'],
           mem=resources['mem'],
           disk=resources['disk'],
@@ -186,6 +206,8 @@
           executor_environ=self._executor_environ,
           framework_role=self._framework_role)
 
+      self._cluster_count.increment()
+
       return get_cluster_path(self._discover_zk_url, cluster_name), password
 
   def delete_cluster(self, cluster_name, password):
@@ -204,6 +226,14 @@
       launcher.kill(password)
       log.info("Attempted to kill cluster %s" % cluster_name)
 
+      self._cluster_count.decrement()
+      cluster_info = launcher.cluster_info
+      self._total_requested_cpus.write(self._total_requested_cpus.read() - cluster_info.total_cpus)
+      self._total_requested_mem_mb.write(
+          self._total_requested_mem_mb.read() - cluster_info.total_mem_mb)
+      self._total_requested_disk_mb.write(
+          self._total_requested_disk_mb.read() - cluster_info.total_disk_mb)
+
       return get_cluster_path(self._discover_zk_url, cluster_name)
 
   @property
@@ -289,6 +319,16 @@
           self._executor_environ,
           self._framework_role)
 
+      # Recover metrics from restored state.
+      self._cluster_count.increment()
+
+      cluster_info = self._launchers[cluster.name].cluster_info
+      self._total_requested_cpus.write(self._total_requested_cpus.read() + cluster_info.total_cpus)
+      self._total_requested_mem_mb.write(
+          self._total_requested_mem_mb.read() + cluster_info.total_mem_mb)
+      self._total_requested_disk_mb.write(
+          self._total_requested_disk_mb.read() + cluster_info.total_disk_mb)
+
     log.info("Recovered %s clusters" % len(self._launchers))
 
   @logged
@@ -409,7 +449,7 @@
     try:
       resources_ = json.loads(size)
       resources = dict(
-        cpus=resources_['cpus'],
+        cpus=float(resources_['cpus']),
         mem=parse_data(resources_['mem']),
         disk=parse_data(resources_['disk']))
     except (TypeError, KeyError, ValueError, InvalidData):
diff --git a/setup.py b/setup.py
index ab01bf7..db81925 100644
--- a/setup.py
+++ b/setup.py
@@ -52,6 +52,7 @@
         make_commons_requirement('http'),
         make_commons_requirement('lang'),
         make_commons_requirement('log'),
+        make_commons_requirement('metrics'),
         make_commons_requirement('quantity'),
         make_commons_requirement('zookeeper'),
     ],
diff --git a/tests/scheduler/test_http.py b/tests/scheduler/test_http.py
index 9c814a8..90cb678 100644
--- a/tests/scheduler/test_http.py
+++ b/tests/scheduler/test_http.py
@@ -9,6 +9,7 @@
 
 import pytest
 from webtest import AppError, TestApp
+from twitter.common.metrics import MetricSampler, RootMetrics
 
 
 MYSOS_MODULE = 'mysos.scheduler'
@@ -44,7 +45,8 @@
 
   def setUp(self):
     self._scheduler = FakeScheduler()
-    self._app = TestApp(MysosServer(self._scheduler, self.web_assets_dir).app)
+    self._app = TestApp(
+        MysosServer(self._scheduler, self.web_assets_dir, MetricSampler(RootMetrics())).app)
 
   def test_create_cluster_successful(self):
     response = ('test_cluster_url', 'passwordfortestcluster')
diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py
index 0beb182..58ea8a3 100644
--- a/tests/scheduler/test_scheduler.py
+++ b/tests/scheduler/test_scheduler.py
@@ -18,7 +18,8 @@
 from kazoo.handlers.threading import SequentialThreadingHandler
 import mesos.interface.mesos_pb2 as mesos_pb2
 from twitter.common import log
-from twitter.common.quantity import Amount, Time
+from twitter.common.metrics import RootMetrics
+from twitter.common.quantity import Amount, Data, Time
 from zake.fake_client import FakeClient
 from zake.fake_storage import FakeStorage
 
@@ -203,3 +204,37 @@
     # a 'Filters' object.
     assert (self._driver.method_calls["declineOffer"][0][0][1].refuse_seconds ==
         INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION.as_(Time.SECONDS))
+
+  def test_scheduler_metrics(self):
+    scheduler_key = gen_encryption_key()
+
+    scheduler = MysosScheduler(
+      self._state,
+      self._state_provider,
+      self._framework_user,
+      "./executor.pex",
+      "cmd.sh",
+      self._zk_client,
+      self._zk_url,
+      Amount(5, Time.SECONDS),
+      "/etc/mysos/admin_keyfile.yml",
+      scheduler_key)
+
+    RootMetrics().register_observable('scheduler', scheduler)
+
+    scheduler.registered(self._driver, self._framework_id, object())
+    _, password = scheduler.create_cluster("cluster1", "mysql_user", 3)
+
+    sample = RootMetrics().sample()
+    assert sample['scheduler.cluster_count'] == 1
+    assert sample['scheduler.total_requested_mem_mb'] == DEFAULT_TASK_MEM.as_(Data.MB) * 3
+    assert sample['scheduler.total_requested_disk_mb'] == DEFAULT_TASK_DISK.as_(Data.MB) * 3
+    assert sample['scheduler.total_requested_cpus'] == DEFAULT_TASK_CPUS * 3
+
+    scheduler.delete_cluster("cluster1", password)
+
+    sample = RootMetrics().sample()
+    assert sample['scheduler.cluster_count'] == 0
+    assert sample['scheduler.total_requested_mem_mb'] == 0
+    assert sample['scheduler.total_requested_disk_mb'] == 0
+    assert sample['scheduler.total_requested_cpus'] == 0