Put scheduler callbacks that need to access scheduler state under lock guards.

Review: https://reviews.apache.org/r/37063
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index e5d403e..107dc5a 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -36,6 +36,11 @@
 
 
 class MysosScheduler(mesos.interface.Scheduler, Observable):
+  """
+    Thread-safety:
+      The scheduler instance is accessed by the Mesos driver and API requests so methods that access
+      scheduler state are synchronized.
+  """
 
   class Error(Exception): pass
 
@@ -325,27 +330,28 @@
   # --- Mesos methods. ---
   @logged
   def registered(self, driver, frameworkId, masterInfo):
-    self._driver = driver
-    self._state.framework_info.id.value = frameworkId.value
-    self._state_provider.dump_scheduler_state(self._state)
+    with self._lock:
+      self._driver = driver
+      self._state.framework_info.id.value = frameworkId.value
+      self._state_provider.dump_scheduler_state(self._state)
 
-    # Recover only after the scheduler is connected because it needs '_driver' to be assigned. This
-    # is blocking the scheduler driver thread but we do want further messages to be blocked until
-    # the scheduler state is fully recovered.
-    # TODO(jyx): If performance becomes an issue, we can also restore all the state data while the
-    # driver is connecting and proceed to recover all the internal state objects after the driver is
-    # connected.
-    try:
-      self._recover()
-    except Exception as e:
-      log.error("Stopping scheduler because: %s" % e)
-      log.error(traceback.format_exc())
-      self._stop()
-      return
+      # Recover only after the scheduler is connected because it needs '_driver' to be assigned.
+      # This is blocking the scheduler driver thread but we do want further messages to be blocked
+      # until the scheduler state is fully recovered.
+      # TODO(jyx): If performance becomes an issue, we can also restore all the state data while the
+      # driver is connecting and proceed to recover all the internal state objects after the driver
+      # is connected.
+      try:
+        self._recover()
+      except Exception as e:
+        log.error("Stopping scheduler because: %s" % e)
+        log.error(traceback.format_exc())
+        self._stop()
+        return
 
-    self._metrics.framework_registered.write(1)
+      self._metrics.framework_registered.write(1)
 
-    self.connected.set()
+      self.connected.set()
 
   def _recover(self):
     """
@@ -417,10 +423,10 @@
 
   @logged
   def resourceOffers(self, driver, offers):
-    log.debug('Got %d resource offers' % len(offers))
-    self._metrics.resource_offers.add(len(offers))
-
     with self._lock:
+      log.debug('Got %d resource offers' % len(offers))
+      self._metrics.resource_offers.add(len(offers))
+
       # Current scheduling algorithm: randomly pick an offer and loop through the list of launchers
       # until one decides to use this offer to launch a task.
       # It's possible to launch multiple tasks on the same Mesos slave (in different batches of
@@ -506,11 +512,12 @@
 
   @logged
   def frameworkMessage(self, driver, executorId, slaveId, message):
-    log.info('Received framework message %s' % message)
-    task_id = executorId.value  # task_id == executor_id in Mysos.
+    with self._lock:
+      log.info('Received framework message %s' % message)
+      task_id = executorId.value  # task_id == executor_id in Mysos.
 
-    launcher = self._get_launcher_by_task_id(task_id)
-    launcher.framework_message(task_id, slaveId.value, message)
+      launcher = self._get_launcher_by_task_id(task_id)
+      launcher.framework_message(task_id, slaveId.value, message)
 
   @logged
   def slaveLost(self, driver, slaveId):