Fix a bug that causes terminal status update not getting sent.
Specifically:
- Wait 5 seconds after sending the status before the executor terminates itself.
- Make sure task_runner.stop() is always called no matter whether the task succeeds or where it fails.
diff --git a/mysos/executor/executor.py b/mysos/executor/executor.py
index 122bc36..c3d17ff 100644
--- a/mysos/executor/executor.py
+++ b/mysos/executor/executor.py
@@ -1,4 +1,6 @@
import json
+import sys
+from threading import Event
import traceback
from mysos.common.decorators import logged
@@ -9,6 +11,7 @@
import mesos.interface.mesos_pb2 as mesos_pb2
from twitter.common import log
from twitter.common.concurrent import defer
+from twitter.common.quantity import Amount, Time
class MysosExecutor(Executor):
@@ -16,6 +19,8 @@
MysosExecutor is a fine-grained executor, i.e., one executor executes a single task.
"""
+ STOP_WAIT = Amount(5, Time.SECONDS)
+
def __init__(self, runner_provider, sandbox):
"""
:param runner_provider: An implementation of TaskRunnerProvider.
@@ -27,6 +32,8 @@
self._killed = False # True if the executor's singleton task is killed by the scheduler.
self._sandbox = sandbox
+ self._terminated = Event() # Set when the runner has terminated.
+
# --- Mesos methods. ---
@logged
def registered(self, driver, executorInfo, frameworkInfo, slaveInfo):
@@ -81,21 +88,24 @@
log.error(traceback.format_exc())
# Send TASK_LOST for unknown errors.
self._send_update(task.task_id.value, mesos_pb2.TASK_LOST)
-
- # Wait for the task's return code (when it terminates).
- try:
- returncode = self._runner.join()
- # Regardless of the return code, if '_runner' terminates, it failed!
- log.error("Task process terminated with return code %s" % returncode)
- except TaskError as e:
- log.error("Task terminated: %s" % e)
-
- if self._killed:
- self._send_update(task.task_id.value, mesos_pb2.TASK_KILLED)
else:
- self._send_update(task.task_id.value, mesos_pb2.TASK_FAILED)
-
- self._kill()
+ # Wait for the task's return code (when it terminates).
+ try:
+ returncode = self._runner.join()
+ # If '_runner' terminates, it has either failed or been killed.
+ log.warn("Task process terminated with return code %s" % returncode)
+ except TaskError as e:
+ log.error("Task terminated: %s" % e)
+ finally:
+ if self._killed:
+ self._send_update(task.task_id.value, mesos_pb2.TASK_KILLED)
+ else:
+ self._send_update(task.task_id.value, mesos_pb2.TASK_FAILED)
+ self._terminated.set()
+ finally:
+ # No matter what happens above, when we reach here the executor has no task to run so it
+ # should just commit seppuku.
+ self._kill()
@logged
def frameworkMessage(self, driver, message):
@@ -118,37 +128,37 @@
'epoch': master_epoch, # Send the epoch back without parsing it.
'position': position
}))
- except TaskError as e:
- # Log the error and do not reply to the framework.
+ except Exception as e:
log.error("Committing suicide due to failure to process framework message: %s" % e)
+ log.error(traceback.format_exc())
self._kill()
@logged
def killTask(self, driver, taskId):
# Killing the task also kills the executor because there is one task per executor.
log.info("Asked to kill task %s" % taskId.value)
- self._killed = True
+
self._kill()
def _kill(self):
if self._runner:
+ self._killed = True
self._runner.stop() # It could be already stopped. If so, self._runner.stop() is a no-op.
+ self._terminated.wait(sys.maxint)
assert self._driver
# TODO(jyx): Fix https://issues.apache.org/jira/browse/MESOS-243.
- self._driver.stop()
+ defer(lambda: self._driver.stop(), delay=self.STOP_WAIT)
@logged
def shutdown(self, driver):
log.info("Asked to shut down")
- self._killed = True
self._kill()
@logged
def error(self, driver, message):
log.error("Shutting down due to error: %s" % message)
- self._killed = True
self._kill()
def _send_update(self, task_id, state, message=None):