Put scheduler callbacks that need to access scheduler state under lock guards.
Review: https://reviews.apache.org/r/37063
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index e5d403e..107dc5a 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -36,6 +36,11 @@
class MysosScheduler(mesos.interface.Scheduler, Observable):
+ """
+ Thread-safety:
+ The scheduler instance is accessed by the Mesos driver and API requests so methods that access
+ scheduler state are synchronized.
+ """
class Error(Exception): pass
@@ -325,27 +330,28 @@
# --- Mesos methods. ---
@logged
def registered(self, driver, frameworkId, masterInfo):
- self._driver = driver
- self._state.framework_info.id.value = frameworkId.value
- self._state_provider.dump_scheduler_state(self._state)
+ with self._lock:
+ self._driver = driver
+ self._state.framework_info.id.value = frameworkId.value
+ self._state_provider.dump_scheduler_state(self._state)
- # Recover only after the scheduler is connected because it needs '_driver' to be assigned. This
- # is blocking the scheduler driver thread but we do want further messages to be blocked until
- # the scheduler state is fully recovered.
- # TODO(jyx): If performance becomes an issue, we can also restore all the state data while the
- # driver is connecting and proceed to recover all the internal state objects after the driver is
- # connected.
- try:
- self._recover()
- except Exception as e:
- log.error("Stopping scheduler because: %s" % e)
- log.error(traceback.format_exc())
- self._stop()
- return
+ # Recover only after the scheduler is connected because it needs '_driver' to be assigned.
+ # This is blocking the scheduler driver thread but we do want further messages to be blocked
+ # until the scheduler state is fully recovered.
+ # TODO(jyx): If performance becomes an issue, we can also restore all the state data while the
+ # driver is connecting and proceed to recover all the internal state objects after the driver
+ # is connected.
+ try:
+ self._recover()
+ except Exception as e:
+ log.error("Stopping scheduler because: %s" % e)
+ log.error(traceback.format_exc())
+ self._stop()
+ return
- self._metrics.framework_registered.write(1)
+ self._metrics.framework_registered.write(1)
- self.connected.set()
+ self.connected.set()
def _recover(self):
"""
@@ -417,10 +423,10 @@
@logged
def resourceOffers(self, driver, offers):
- log.debug('Got %d resource offers' % len(offers))
- self._metrics.resource_offers.add(len(offers))
-
with self._lock:
+ log.debug('Got %d resource offers' % len(offers))
+ self._metrics.resource_offers.add(len(offers))
+
# Current scheduling algorithm: randomly pick an offer and loop through the list of launchers
# until one decides to use this offer to launch a task.
# It's possible to launch multiple tasks on the same Mesos slave (in different batches of
@@ -506,11 +512,12 @@
@logged
def frameworkMessage(self, driver, executorId, slaveId, message):
- log.info('Received framework message %s' % message)
- task_id = executorId.value # task_id == executor_id in Mysos.
+ with self._lock:
+ log.info('Received framework message %s' % message)
+ task_id = executorId.value # task_id == executor_id in Mysos.
- launcher = self._get_launcher_by_task_id(task_id)
- launcher.framework_message(task_id, slaveId.value, message)
+ launcher = self._get_launcher_by_task_id(task_id)
+ launcher.framework_message(task_id, slaveId.value, message)
@logged
def slaveLost(self, driver, slaveId):