Add scheduler metrics related resource offers.

-  And demote its logging to 'DEBUG' level so that long-running scheduler not in verbose mode doesn't print out excessive amount of logs about offers which can exceed its disk quota if there's any.
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index 8dc55ab..b1cfe5c 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -150,6 +150,16 @@
     self._metrics.tasks_failed = self.metrics.register(AtomicGauge('tasks_failed', 0))
     self._metrics.tasks_killed = self.metrics.register(AtomicGauge('tasks_killed', 0))
 
+    self._metrics.resource_offers = self.metrics.register(AtomicGauge('resource_offers', 0))
+    self._metrics.offers_incompatible_role = self.metrics.register(
+        AtomicGauge('offers_incompatible_role', 0))
+
+    self._metrics.tasks_launched = self.metrics.register(AtomicGauge('tasks_launched', 0))
+
+    # 'offers_unused' are due to idle scheduler or resources don't fit, i.e.,
+    # 'resource_offers' - 'tasks_launched' - 'offers_incompatible_role'.
+    self._metrics.offers_unused = self.metrics.register(AtomicGauge('offers_unused', 0))
+
   # --- Public interface. ---
   def create_cluster(
         self,
@@ -396,7 +406,8 @@
 
   @logged
   def resourceOffers(self, driver, offers):
-    log.info('Got %d resource offers' % len(offers))
+    log.debug('Got %d resource offers' % len(offers))
+    self._metrics.resource_offers.add(len(offers))
 
     with self._lock:
       # Current scheduling algorithm: randomly pick an offer and loop through the list of launchers
@@ -413,29 +424,33 @@
           try:
             task_id, _ = launcher.launch(offer)
           except MySQLClusterLauncher.IncompatibleRoleError as e:
+            log.info("Declining offer %s for %s because '%s'" % (
+                offer.id.value, INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION, e))
             # This "error" is not severe and we expect this to occur frequently only when Mysos
             # first joins the cluster. For a running Mesos cluster this should be somewhat rare
             # because we refuse the offer "forever".
             filters = mesos_pb2.Filters()
             filters.refuse_seconds = INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION.as_(Time.SECONDS)
+
+            self._metrics.offers_incompatible_role.increment()
             break  # No need to check with other launchers.
           if task_id:
+            self._metrics.tasks_launched.increment()
+
             self._tasks[task_id] = launcher.cluster_name
             # No need to check with other launchers. 'filters' remains unset.
             break
         if task_id:
           break  # Some launcher has used this offer. Move on to the next one.
 
-        if filters:
-          log.info("Declining offer %s for %s because '%s'" % (
-              offer.id.value, INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION, e))
-        else:
-          log.info("Declining offer %s because no launcher accepted this offer" % offer.id.value)
+        if not filters:
+          log.debug("Declining unused offer %s because no launcher accepted this offer: %s" % (
+              offer.id.value, offer))
           # Mesos scheduler Python binding doesn't deal with filters='None' properly.
           # See https://issues.apache.org/jira/browse/MESOS-2567.
           filters = mesos_pb2.Filters()
+          self._metrics.offers_unused.increment()
 
-        log.debug(offer)
         self._driver.declineOffer(offer.id, filters)
 
   @logged
diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py
index 3db187a..6e6ced0 100644
--- a/tests/scheduler/test_scheduler.py
+++ b/tests/scheduler/test_scheduler.py
@@ -194,6 +194,9 @@
         "/etc/mysos/admin_keyfile.yml",
         gen_encryption_key(),
         framework_role='mysos')  # Require 'mysos' but the resources are in '*'.
+
+    RootMetrics().register_observable('scheduler', scheduler1)
+
     scheduler1.registered(self._driver, self._framework_id, object())
     scheduler1.create_cluster("cluster1", "mysql_user", 3)
     scheduler1.resourceOffers(self._driver, [self._offer])
@@ -205,6 +208,9 @@
     assert (self._driver.method_calls["declineOffer"][0][0][1].refuse_seconds ==
         INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION.as_(Time.SECONDS))
 
+    sample = RootMetrics().sample()
+    assert sample['scheduler.offers_incompatible_role'] == 1
+
   def test_scheduler_metrics(self):
     scheduler_key = gen_encryption_key()
 
@@ -237,6 +243,9 @@
     assert sample['scheduler.total_requested_cpus'] == DEFAULT_TASK_CPUS * 3
 
     scheduler.resourceOffers(self._driver, [self._offer])
+    sample = RootMetrics().sample()
+    assert sample['scheduler.resource_offers'] == 1
+    assert sample['scheduler.tasks_launched'] == 1
 
     status = mesos_pb2.TaskStatus()
     status.state = mesos_pb2.TASK_RUNNING