Merge pull request #52 from xujyan/xujyan/metrics

Add more scheduler metrics.
diff --git a/mysos/scheduler/elector.py b/mysos/scheduler/elector.py
index fa55bbe..4d4c08b 100644
--- a/mysos/scheduler/elector.py
+++ b/mysos/scheduler/elector.py
@@ -172,8 +172,9 @@
       return
 
     self._master_callback(self._master)  # Invoke the callback from the elector thread.
-    log.info("Stopping the elector thread for cluster %s because the election has completed" %
-        self._cluster_name)
+    log.info(
+        "Stopping the elector thread for cluster %s (epoch %s) because the election has completed" %
+        (self._cluster_name, self._epoch))
 
   def _elect(self, timedout=False):
     """
diff --git a/mysos/scheduler/launcher.py b/mysos/scheduler/launcher.py
index 7a5908e..87923f5 100644
--- a/mysos/scheduler/launcher.py
+++ b/mysos/scheduler/launcher.py
@@ -531,6 +531,11 @@
         log.info("Received framework message '%s' from task %s (%s) when there is no pending "
             "election" % (message, task_id, slave_id))
 
+  def stop(self):
+    """Called when the launcher is being shut down (due to removal of the cluster)."""
+    if self._elector:
+      self._elector.abort()
+
 
 # --- Utility methods. ---
 def create_resources(cpus, mem, disk, ports, role='*'):
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index 52986ae..b1cfe5c 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -1,4 +1,5 @@
 from collections import OrderedDict
+from datetime import datetime
 import json
 import posixpath
 import random
@@ -17,7 +18,7 @@
 import mesos.interface.mesos_pb2 as mesos_pb2
 from twitter.common import log
 from twitter.common.collections.orderedset import OrderedSet
-from twitter.common.metrics import AtomicGauge, MutatorGauge, Observable
+from twitter.common.metrics import AtomicGauge, LambdaGauge, MutatorGauge, Observable
 from twitter.common.quantity import Amount, Data, Time
 from twitter.common.quantity.parse_simple import InvalidData, parse_data
 
@@ -41,6 +42,8 @@
   class InvalidUser(Error): pass
   class ServiceUnavailable(Error): pass
 
+  class Metrics(object): pass  # Used as a namespace nested in MysosScheduler for metrics.
+
   def __init__(
       self,
       state,
@@ -115,18 +118,48 @@
     self.stopped = threading.Event()  # An event set when the scheduler is stopped.
     self.connected = threading.Event()  # An event set when the scheduler is first connected to
                                         # Mesos. The scheduler tolerates later disconnections.
+    self._setup_metrics()
 
-    self._cluster_count = self.metrics.register(AtomicGauge('cluster_count', 0))
+  def _setup_metrics(self):
+    self._metrics = self.Metrics()
+
+    self._metrics.cluster_count = self.metrics.register(AtomicGauge('cluster_count', 0))
 
     # Total resources requested by the scheduler's clients. When a cluster is created its resources
     # are added to the total; when it's deleted its resources are subtracted from the total.
     # NOTE: These are 'requested' resources that are independent of resources offered by Mesos or
     # allocated to or used by Mysos tasks running on Mesos cluster.
-    self._total_requested_cpus = self.metrics.register(MutatorGauge('total_requested_cpus', 0.))
-    self._total_requested_mem_mb = self.metrics.register(MutatorGauge('total_requested_mem_mb', 0.))
-    self._total_requested_disk_mb = self.metrics.register(
+    self._metrics.total_requested_cpus = self.metrics.register(
+        MutatorGauge('total_requested_cpus', 0.))
+    self._metrics.total_requested_mem_mb = self.metrics.register(
+        MutatorGauge('total_requested_mem_mb', 0.))
+    self._metrics.total_requested_disk_mb = self.metrics.register(
         MutatorGauge('total_requested_disk_mb', 0.))
 
+    # 1: registered; 0: not registered.
+    self._metrics.framework_registered = self.metrics.register(
+        MutatorGauge('framework_registered', 0))
+
+    self._startup_time = datetime.utcnow()
+    self._metrics.uptime = self.metrics.register(
+        LambdaGauge('uptime', lambda: (datetime.utcnow() - self._startup_time).total_seconds()))
+
+    # Counters for tasks in terminal states.
+    self._metrics.tasks_lost = self.metrics.register(AtomicGauge('tasks_lost', 0))
+    self._metrics.tasks_finished = self.metrics.register(AtomicGauge('tasks_finished', 0))
+    self._metrics.tasks_failed = self.metrics.register(AtomicGauge('tasks_failed', 0))
+    self._metrics.tasks_killed = self.metrics.register(AtomicGauge('tasks_killed', 0))
+
+    self._metrics.resource_offers = self.metrics.register(AtomicGauge('resource_offers', 0))
+    self._metrics.offers_incompatible_role = self.metrics.register(
+        AtomicGauge('offers_incompatible_role', 0))
+
+    self._metrics.tasks_launched = self.metrics.register(AtomicGauge('tasks_launched', 0))
+
+    # 'offers_unused' are due to idle scheduler or resources don't fit, i.e.,
+    # 'resource_offers' - 'tasks_launched' - 'offers_incompatible_role'.
+    self._metrics.offers_unused = self.metrics.register(AtomicGauge('offers_unused', 0))
+
   # --- Public interface. ---
   def create_cluster(
         self,
@@ -179,12 +212,12 @@
       resources = parse_size(size)
       log.info("Requested resources per instance for cluster %s: %s" % (resources, cluster_name))
 
-      self._total_requested_cpus.write(
-          self._total_requested_cpus.read() + resources['cpus'] * num_nodes)
-      self._total_requested_mem_mb.write(
-          self._total_requested_mem_mb.read() + resources['mem'].as_(Data.MB) * num_nodes)
-      self._total_requested_disk_mb.write(
-          self._total_requested_disk_mb.read() + resources['disk'].as_(Data.MB) * num_nodes)
+      self._metrics.total_requested_cpus.write(
+          self._metrics.total_requested_cpus.read() + resources['cpus'] * num_nodes)
+      self._metrics.total_requested_mem_mb.write(
+          self._metrics.total_requested_mem_mb.read() + resources['mem'].as_(Data.MB) * num_nodes)
+      self._metrics.total_requested_disk_mb.write(
+          self._metrics.total_requested_disk_mb.read() + resources['disk'].as_(Data.MB) * num_nodes)
 
       self._state.clusters.add(cluster_name)
       self._state_provider.dump_scheduler_state(self._state)
@@ -224,7 +257,7 @@
           executor_source_prefix=self._executor_source_prefix,
           framework_role=self._framework_role)
 
-      self._cluster_count.increment()
+      self._metrics.cluster_count.increment()
 
       return get_cluster_path(self._discover_zk_url, cluster_name), cluster_password
 
@@ -244,13 +277,14 @@
       launcher.kill(password)
       log.info("Attempted to kill cluster %s" % cluster_name)
 
-      self._cluster_count.decrement()
+      self._metrics.cluster_count.decrement()
       cluster_info = launcher.cluster_info
-      self._total_requested_cpus.write(self._total_requested_cpus.read() - cluster_info.total_cpus)
-      self._total_requested_mem_mb.write(
-          self._total_requested_mem_mb.read() - cluster_info.total_mem_mb)
-      self._total_requested_disk_mb.write(
-          self._total_requested_disk_mb.read() - cluster_info.total_disk_mb)
+      self._metrics.total_requested_cpus.write(
+          self._metrics.total_requested_cpus.read() - cluster_info.total_cpus)
+      self._metrics.total_requested_mem_mb.write(
+          self._metrics.total_requested_mem_mb.read() - cluster_info.total_mem_mb)
+      self._metrics.total_requested_disk_mb.write(
+          self._metrics.total_requested_disk_mb.read() - cluster_info.total_disk_mb)
 
       if launcher.terminated:
         log.info("Deleting the launcher for cluster %s directly because the cluster has already "
@@ -298,6 +332,8 @@
       self._stop()
       return
 
+    self._metrics.framework_registered.write(1)
+
     self.connected.set()
 
   def _recover(self):
@@ -344,30 +380,34 @@
           framework_role=self._framework_role)
 
       # Recover metrics from restored state.
-      self._cluster_count.increment()
+      self._metrics.cluster_count.increment()
 
       cluster_info = self._launchers[cluster.name].cluster_info
-      self._total_requested_cpus.write(self._total_requested_cpus.read() + cluster_info.total_cpus)
-      self._total_requested_mem_mb.write(
-          self._total_requested_mem_mb.read() + cluster_info.total_mem_mb)
-      self._total_requested_disk_mb.write(
-          self._total_requested_disk_mb.read() + cluster_info.total_disk_mb)
+      self._metrics.total_requested_cpus.write(
+          self._metrics.total_requested_cpus.read() + cluster_info.total_cpus)
+      self._metrics.total_requested_mem_mb.write(
+          self._metrics.total_requested_mem_mb.read() + cluster_info.total_mem_mb)
+      self._metrics.total_requested_disk_mb.write(
+          self._metrics.total_requested_disk_mb.read() + cluster_info.total_disk_mb)
 
     log.info("Recovered %s clusters" % len(self._launchers))
 
   @logged
   def reregistered(self, driver, masterInfo):
     self._driver = driver
+
+    self._metrics.framework_registered.write(1)
     self.connected.set()
     # TODO(jyx): Reconcile tasks.
 
   @logged
   def disconnected(self, driver):
-    pass
+    self._metrics.framework_registered.write(0)
 
   @logged
   def resourceOffers(self, driver, offers):
-    log.info('Got %d resource offers' % len(offers))
+    log.debug('Got %d resource offers' % len(offers))
+    self._metrics.resource_offers.add(len(offers))
 
     with self._lock:
       # Current scheduling algorithm: randomly pick an offer and loop through the list of launchers
@@ -384,29 +424,33 @@
           try:
             task_id, _ = launcher.launch(offer)
           except MySQLClusterLauncher.IncompatibleRoleError as e:
+            log.info("Declining offer %s for %s because '%s'" % (
+                offer.id.value, INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION, e))
             # This "error" is not severe and we expect this to occur frequently only when Mysos
             # first joins the cluster. For a running Mesos cluster this should be somewhat rare
             # because we refuse the offer "forever".
             filters = mesos_pb2.Filters()
             filters.refuse_seconds = INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION.as_(Time.SECONDS)
+
+            self._metrics.offers_incompatible_role.increment()
             break  # No need to check with other launchers.
           if task_id:
+            self._metrics.tasks_launched.increment()
+
             self._tasks[task_id] = launcher.cluster_name
             # No need to check with other launchers. 'filters' remains unset.
             break
         if task_id:
           break  # Some launcher has used this offer. Move on to the next one.
 
-        if filters:
-          log.info("Declining offer %s for %s because '%s'" % (
-              offer.id.value, INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION, e))
-        else:
-          log.info("Declining offer %s because no launcher accepted this offer" % offer.id.value)
+        if not filters:
+          log.debug("Declining unused offer %s because no launcher accepted this offer: %s" % (
+              offer.id.value, offer))
           # Mesos scheduler Python binding doesn't deal with filters='None' properly.
           # See https://issues.apache.org/jira/browse/MESOS-2567.
           filters = mesos_pb2.Filters()
+          self._metrics.offers_unused.increment()
 
-        log.debug(offer)
         self._driver.declineOffer(offer.id, filters)
 
   @logged
@@ -425,6 +469,18 @@
         log.error("Status update failed due to launcher error: %s" % e.message)
         self._stop()
 
+      # Update metrics.
+      # TODO(xujyan): This doesn't rule out duplicates, etc. We can consider updating these metrics
+      # in the launcher.
+      if status.state == mesos_pb2.TASK_FINISHED:
+        self._metrics.tasks_finished.increment()
+      elif status.state == mesos_pb2.TASK_FAILED:
+        self._metrics.tasks_failed.increment()
+      elif status.state == mesos_pb2.TASK_KILLED:
+        self._metrics.tasks_killed.increment()
+      elif status.state == mesos_pb2.TASK_LOST:
+        self._metrics.tasks_lost.increment()
+
       if launcher.terminated:
         log.info("Deleting the launcher for cluster %s because the cluster has terminated" %
                  launcher.cluster_name)
@@ -434,6 +490,7 @@
     assert launcher.terminated
     self._state.clusters.discard(launcher.cluster_name)
     self._state_provider.dump_scheduler_state(self._state)
+    self._launchers[launcher.cluster_name].stop()
     del self._launchers[launcher.cluster_name]
 
   @logged
diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py
index 061f915..6e6ced0 100644
--- a/tests/scheduler/test_scheduler.py
+++ b/tests/scheduler/test_scheduler.py
@@ -194,6 +194,9 @@
         "/etc/mysos/admin_keyfile.yml",
         gen_encryption_key(),
         framework_role='mysos')  # Require 'mysos' but the resources are in '*'.
+
+    RootMetrics().register_observable('scheduler', scheduler1)
+
     scheduler1.registered(self._driver, self._framework_id, object())
     scheduler1.create_cluster("cluster1", "mysql_user", 3)
     scheduler1.resourceOffers(self._driver, [self._offer])
@@ -205,6 +208,9 @@
     assert (self._driver.method_calls["declineOffer"][0][0][1].refuse_seconds ==
         INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION.as_(Time.SECONDS))
 
+    sample = RootMetrics().sample()
+    assert sample['scheduler.offers_incompatible_role'] == 1
+
   def test_scheduler_metrics(self):
     scheduler_key = gen_encryption_key()
 
@@ -223,6 +229,10 @@
     RootMetrics().register_observable('scheduler', scheduler)
 
     scheduler.registered(self._driver, self._framework_id, object())
+
+    sample = RootMetrics().sample()
+    assert sample['scheduler.framework_registered'] == 1
+
     scheduler.create_cluster(
         "cluster1", "mysql_user", 3, cluster_password='test_password')
 
@@ -232,6 +242,24 @@
     assert sample['scheduler.total_requested_disk_mb'] == DEFAULT_TASK_DISK.as_(Data.MB) * 3
     assert sample['scheduler.total_requested_cpus'] == DEFAULT_TASK_CPUS * 3
 
+    scheduler.resourceOffers(self._driver, [self._offer])
+    sample = RootMetrics().sample()
+    assert sample['scheduler.resource_offers'] == 1
+    assert sample['scheduler.tasks_launched'] == 1
+
+    status = mesos_pb2.TaskStatus()
+    status.state = mesos_pb2.TASK_RUNNING
+    status.slave_id.value = self._offer.slave_id.value
+    status.task_id.value = 'mysos-cluster1-0'
+
+    scheduler.statusUpdate(self._driver, status)
+
+    status.state = mesos_pb2.TASK_FAILED
+    scheduler.statusUpdate(self._driver, status)
+
+    sample = RootMetrics().sample()
+    assert sample['scheduler.tasks_failed'] == 1
+
     scheduler.delete_cluster("cluster1", 'test_password')
 
     sample = RootMetrics().sample()