| # |
| # Copyright (C) 2018 Codethink Limited |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this library. If not, see <http://www.gnu.org/licenses/>. |
| # |
| # Authors: |
| # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> |
| # Jürg Billeter <juerg.billeter@codethink.co.uk> |
| # Tristan Maat <tristan.maat@codethink.co.uk> |
| |
| # System imports |
| import asyncio |
| import datetime |
| import enum |
| import io |
| import multiprocessing |
| import os |
| import signal |
| import sys |
| import time |
| import traceback |
| |
| # BuildStream toplevel imports |
| from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob |
| from ..._message import Message, MessageType, unconditional_messages |
| from ... import _signals, utils |
| from ..._platform import Platform |
| |
| from .jobpickler import pickle_child_job, unpickle_child_job |
| |
| |
| # Return code values shutdown of job handling child processes |
| # |
| @enum.unique |
| class _ReturnCode(enum.IntEnum): |
| OK = 0 |
| FAIL = 1 |
| PERM_FAIL = 2 |
| SKIPPED = 3 |
| |
| |
| def _call_on_waitpid_threadfun(running_loop, process, callback): |
| process.join() |
| running_loop.call_soon_threadsafe(callback, process.pid, process.exitcode) |
| |
| |
| def call_on_waitpid(running_loop, pid, callback): |
| import threading |
| t = threading.Thread( |
| target=_call_on_waitpid_threadfun, |
| args=(running_loop, pid, callback) |
| ) |
| t.start() |
| return t |
| |
| |
| # JobStatus: |
| # |
| # The job completion status, passed back through the |
| # complete callbacks. |
| # |
| @enum.unique |
| class JobStatus(enum.Enum): |
| # Job succeeded |
| OK = 0 |
| |
| # A temporary BstError was raised |
| FAIL = 1 |
| |
| # A SkipJob was raised |
| SKIPPED = 3 |
| |
| |
| # Used to distinguish between status messages and return values |
| class _Envelope(): |
| def __init__(self, message_type, message): |
| self.message_type = message_type |
| self.message = message |
| |
| |
| # Process class that doesn't call waitpid on its own. |
| # This prevents conflicts with the asyncio child watcher. |
| class Process(multiprocessing.Process): |
| # pylint: disable=attribute-defined-outside-init |
| def start(self): |
| self._popen = self._Popen(self) |
| self._sentinel = self._popen.sentinel |
| |
| |
| @enum.unique |
| class _MessageType(enum.Enum): |
| LOG_MESSAGE = 1 |
| ERROR = 2 |
| RESULT = 3 |
| CHILD_DATA = 4 |
| SUBCLASS_CUSTOM_MESSAGE = 5 |
| |
| |
| def _do_pickled_child_job(pickled, *child_args): |
| child_job = unpickle_child_job(pickled) |
| return child_job.child_action(*child_args) |
| |
| |
| # Job() |
| # |
| # The Job object represents a task that will run in parallel to the main |
| # process. It has some methods that are not implemented - they are meant for |
| # you to implement in a subclass. |
| # |
| # It has a close relationship with the ChildJob class, and it can be considered |
| # a two part solution: |
| # |
| # 1. A Job instance, which will create a ChildJob instance and arrange for |
| # childjob.child_process() to be executed in another process. |
| # 2. The created ChildJob instance, which does the actual work. |
| # |
| # This split makes it clear what data is passed to the other process and what |
| # is executed in which process. |
| # |
| # To set up a minimal new kind of Job, e.g. YourJob: |
| # |
| # 1. Create a YourJob class, inheriting from Job. |
| # 2. Create a YourChildJob class, inheriting from ChildJob. |
| # 3. Implement YourJob.create_child_job() and YourJob.parent_complete(). |
| # 4. Implement YourChildJob.child_process(). |
| # |
| # A Job instance and its ChildJob share a message queue. You may send custom |
| # messages to the main process using YourChildJob.send_message(). Such messages |
| # must be processed in YourJob.handle_message(), which you will also need to |
| # override for this purpose. |
| # |
| # Args: |
| # scheduler (Scheduler): The scheduler |
| # action_name (str): The queue action name |
| # logfile (str): A template string that points to the logfile |
| # that should be used - should contain {pid}. |
| # max_retries (int): The maximum number of retries |
| # |
| class Job(): |
| |
| def __init__(self, scheduler, action_name, logfile, *, max_retries=0): |
| |
| # |
| # Public members |
| # |
| self.name = None # The name of the job, set by the job's subclass |
| self.action_name = action_name # The action name for the Queue |
| self.child_data = None # Data to be sent to the main process |
| |
| # |
| # Private members |
| # |
| self._scheduler = scheduler # The scheduler |
| self._queue = None # A message passing queue |
| self._process = None # The Process object |
| self._watcher = None # Child process watcher |
| self._listening = False # Whether the parent is currently listening |
| self._suspended = False # Whether this job is currently suspended |
| self._max_retries = max_retries # Maximum number of automatic retries |
| self._result = None # Return value of child action in the parent |
| self._tries = 0 # Try count, for retryable jobs |
| self._terminated = False # Whether this job has been explicitly terminated |
| |
| self._logfile = logfile |
| self._message_unique_id = None |
| self._task_id = None |
| |
| # set_name() |
| # |
| # Sets the name of this job |
| def set_name(self, name): |
| self.name = name |
| |
| # start() |
| # |
| # Starts the job. |
| # |
| def start(self): |
| platform = Platform.get_platform() |
| self._queue = platform.make_queue() |
| |
| self._tries += 1 |
| self._parent_start_listening() |
| |
| child_job = self.create_child_job( # pylint: disable=assignment-from-no-return |
| self.action_name, |
| self._scheduler.context.messenger, |
| self._scheduler.context.logdir, |
| self._logfile, |
| self._max_retries, |
| self._tries, |
| self._message_unique_id, |
| self._task_id, |
| ) |
| |
| then = time.time() |
| pickled = pickle_child_job(child_job, self._scheduler.context) |
| now = time.time() |
| pickled.seek(0, io.SEEK_END) |
| self.message(MessageType.INFO, "pickled len: {:,}".format(pickled.tell())) |
| self.message(MessageType.INFO, "pickle time: {}s".format(round(now - then, 2))) |
| pickled.seek(0) |
| then = time.time() |
| self._process = Process( |
| target=_do_pickled_child_job, |
| args=[pickled, self._queue], |
| ) |
| now = time.time() |
| self.message(MessageType.INFO, "make process: {}s".format(round(now - then, 2))) |
| |
| # Block signals which are handled in the main process such that |
| # the child process does not inherit the parent's state, but the main |
| # process will be notified of any signal after we launch the child. |
| # |
| then = time.time() |
| with _signals.blocked([signal.SIGINT, signal.SIGTERM], ignore=False): |
| self._process.start() |
| now = time.time() |
| self.message(MessageType.INFO, "start process: {}s".format(round(now - then, 2))) |
| |
| # Wait for the child task to complete. |
| # |
| # This is a tricky part of python which doesnt seem to |
| # make it to the online docs: |
| # |
| # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance |
| # which is the default type of watcher, and the instance belongs to the |
| # "event loop policy" in use (so there is only one in the main process). |
| # |
| # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio |
| # loop, and will selectively reap any child pids which have been |
| # terminated. |
| # |
| # o At registration time, the process will immediately be checked with |
| # `os.waitpid()` and will be reaped immediately, before add_child_handler() |
| # returns. |
| # |
| # The self._parent_child_completed callback passed here will normally |
| # be called after the child task has been reaped with `os.waitpid()`, in |
| # an event loop callback. Otherwise, if the job completes too fast, then |
| # the callback is called immediately. |
| # |
| self._watcher = call_on_waitpid(self._scheduler.loop, self._process, self._parent_child_completed) |
| |
| # terminate() |
| # |
| # Politely request that an ongoing job terminate soon. |
| # |
| # This will send a SIGTERM signal to the Job process. |
| # |
| def terminate(self): |
| |
| # First resume the job if it's suspended |
| self.resume(silent=True) |
| |
| self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) |
| |
| # Make sure there is no garbage on the queue |
| self._parent_stop_listening() |
| |
| # Terminate the process using multiprocessing API pathway |
| self._process.terminate() |
| |
| self._terminated = True |
| |
| # get_terminated() |
| # |
| # Check if a job has been terminated. |
| # |
| # Returns: |
| # (bool): True in the main process if Job.terminate() was called. |
| # |
| def get_terminated(self): |
| return self._terminated |
| |
| # terminate_wait() |
| # |
| # Wait for terminated jobs to complete |
| # |
| # Args: |
| # timeout (float): Seconds to wait |
| # |
| # Returns: |
| # (bool): True if the process terminated cleanly, otherwise False |
| # |
| def terminate_wait(self, timeout): |
| |
| # Join the child process after sending SIGTERM |
| self._process.join(timeout) |
| return self._process.exitcode is not None |
| |
| # kill() |
| # |
| # Forcefully kill the process, and any children it might have. |
| # |
| def kill(self): |
| # Force kill |
| self.message(MessageType.WARN, |
| "{} did not terminate gracefully, killing".format(self.action_name)) |
| utils._kill_process_tree(self._process.pid) |
| |
| # suspend() |
| # |
| # Suspend this job. |
| # |
| def suspend(self): |
| if not self._suspended: |
| self.message(MessageType.STATUS, |
| "{} suspending".format(self.action_name)) |
| |
| try: |
| # Use SIGTSTP so that child processes may handle and propagate |
| # it to processes they start that become session leaders. |
| os.kill(self._process.pid, signal.SIGTSTP) |
| |
| # For some reason we receive exactly one suspend event for |
| # every SIGTSTP we send to the child process, even though the |
| # child processes are setsid(). We keep a count of these so we |
| # can ignore them in our event loop suspend_event(). |
| self._scheduler.internal_stops += 1 |
| self._suspended = True |
| except ProcessLookupError: |
| # ignore, process has already exited |
| pass |
| |
| # resume() |
| # |
| # Resume this suspended job. |
| # |
| def resume(self, silent=False): |
| if self._suspended: |
| if not silent and not self._scheduler.terminated: |
| self.message(MessageType.STATUS, |
| "{} resuming".format(self.action_name)) |
| |
| os.kill(self._process.pid, signal.SIGCONT) |
| self._suspended = False |
| |
| # set_message_unique_id() |
| # |
| # This is called by Job subclasses to set the plugin ID |
| # issuing the message (if an element is related to the Job). |
| # |
| # Args: |
| # unique_id (int): The id to be supplied to the Message() constructor |
| # |
| def set_message_unique_id(self, unique_id): |
| self._message_unique_id = unique_id |
| |
| # set_task_id() |
| # |
| # This is called by Job subclasses to set a plugin ID |
| # associated with the task at large (if any element is related |
| # to the task). |
| # |
| # This will only be used in the child process running the task. |
| # |
| # The task ID helps keep messages in the frontend coherent |
| # in the case that multiple plugins log in the context of |
| # a single task (e.g. running integration commands should appear |
| # in the frontend for the element being built, not the element |
| # running the integration commands). |
| # |
| # Args: |
| # task_id (int): The plugin identifier for this task |
| # |
| def set_task_id(self, task_id): |
| self._task_id = task_id |
| |
| # message(): |
| # |
| # Logs a message, this will be logged in the task's logfile and |
| # conditionally also be sent to the frontend. |
| # |
| # Args: |
| # message_type (MessageType): The type of message to send |
| # message (str): The message |
| # kwargs: Remaining Message() constructor arguments, note that you can |
| # override 'unique_id' this way. |
| # |
| def message(self, message_type, message, **kwargs): |
| kwargs['scheduler'] = True |
| unique_id = self._message_unique_id |
| if "unique_id" in kwargs: |
| unique_id = kwargs["unique_id"] |
| del kwargs["unique_id"] |
| self._scheduler.context.messenger.message( |
| Message(unique_id, message_type, message, **kwargs)) |
| |
| ####################################################### |
| # Abstract Methods # |
| ####################################################### |
| |
| # handle_message() |
| # |
| # Handle a custom message. This will be called in the main process in |
| # response to any messages sent to the main process using the |
| # Job.send_message() API from inside a Job.child_process() implementation. |
| # |
| # There is no need to implement this function if no custom messages are |
| # expected. |
| # |
| # Args: |
| # message (any): A simple object (must be pickle-able, i.e. strings, |
| # lists, dicts, numbers, but not Element instances). |
| # |
| def handle_message(self, message): |
| raise ImplError("Job '{kind}' does not implement handle_message()" |
| .format(kind=type(self).__name__)) |
| |
| # parent_complete() |
| # |
| # This will be executed in the main process after the job finishes, and is |
| # expected to pass the result to the main thread. |
| # |
| # Args: |
| # status (JobStatus): The job exit status |
| # result (any): The result returned by child_process(). |
| # |
| def parent_complete(self, status, result): |
| raise ImplError("Job '{kind}' does not implement parent_complete()" |
| .format(kind=type(self).__name__)) |
| |
| # create_child_job() |
| # |
| # Called by a Job instance to create a child job. |
| # |
| # The child job object is an instance of a subclass of ChildJob. |
| # |
| # The child job object's child_process() method will be executed in another |
| # process, so that work is done in parallel. See the documentation for the |
| # Job class for more information on this relationship. |
| # |
| # This method must be overridden by Job subclasses. |
| # |
| # Returns: |
| # (ChildJob): An instance of a subclass of ChildJob. |
| # |
| def create_child_job(self, *args, **kwargs): |
| raise ImplError("Job '{kind}' does not implement create_child_job()" |
| .format(kind=type(self).__name__)) |
| |
| ####################################################### |
| # Local Private Methods # |
| ####################################################### |
| |
| # _parent_shutdown() |
| # |
| # Shuts down the Job on the parent side by reading any remaining |
| # messages on the message queue and cleaning up any resources. |
| # |
| def _parent_shutdown(self): |
| # Make sure we've read everything we need and then stop listening |
| self._parent_process_queue() |
| self._parent_stop_listening() |
| |
| # _parent_child_completed() |
| # |
| # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() |
| # |
| # Args: |
| # pid (int): The PID of the child which completed |
| # returncode (int): The return code of the child process |
| # |
| def _parent_child_completed(self, pid, returncode): |
| self._parent_shutdown() |
| |
| # We don't want to retry if we got OK or a permanent fail. |
| retry_flag = returncode == _ReturnCode.FAIL |
| |
| if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: |
| self.start() |
| return |
| |
| # Resolve the outward facing overall job completion status |
| # |
| if returncode == _ReturnCode.OK: |
| status = JobStatus.OK |
| elif returncode == _ReturnCode.SKIPPED: |
| status = JobStatus.SKIPPED |
| elif returncode in (_ReturnCode.FAIL, _ReturnCode.PERM_FAIL): |
| status = JobStatus.FAIL |
| else: |
| status = JobStatus.FAIL |
| |
| self.parent_complete(status, self._result) |
| self._scheduler.job_completed(self, status) |
| |
| # Force the deletion of the queue and process objects to try and clean up FDs |
| self._queue = self._process = None |
| |
| # _parent_process_envelope() |
| # |
| # Processes a message Envelope deserialized form the message queue. |
| # |
| # this will have the side effect of assigning some local state |
| # on the Job in the parent process for later inspection when the |
| # child process completes. |
| # |
| # Args: |
| # envelope (Envelope): The message envelope |
| # |
| def _parent_process_envelope(self, envelope): |
| if not self._listening: |
| return |
| |
| if envelope.message_type is _MessageType.LOG_MESSAGE: |
| # Propagate received messages from children |
| # back through the context. |
| self._scheduler.context.messenger.message(envelope.message) |
| elif envelope.message_type is _MessageType.ERROR: |
| # For regression tests only, save the last error domain / reason |
| # reported from a child task in the main process, this global state |
| # is currently managed in _exceptions.py |
| set_last_task_error(envelope.message['domain'], |
| envelope.message['reason']) |
| elif envelope.message_type is _MessageType.RESULT: |
| assert self._result is None |
| self._result = envelope.message |
| elif envelope.message_type is _MessageType.CHILD_DATA: |
| # If we retry a job, we assign a new value to this |
| self.child_data = envelope.message |
| elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE: |
| self.handle_message(envelope.message) |
| else: |
| assert False, "Unhandled message type '{}': {}".format( |
| envelope.message_type, envelope.message) |
| |
| # _parent_process_queue() |
| # |
| # Reads back message envelopes from the message queue |
| # in the parent process. |
| # |
| def _parent_process_queue(self): |
| while not self._queue.empty(): |
| envelope = self._queue.get_nowait() |
| self._parent_process_envelope(envelope) |
| |
| # _parent_recv() |
| # |
| # A callback to handle I/O events from the message |
| # queue file descriptor in the main process message loop |
| # |
| def _parent_recv(self, *args): |
| self._parent_process_queue() |
| |
| # _parent_start_listening() |
| # |
| # Starts listening on the message queue |
| # |
| def _parent_start_listening(self): |
| # Warning: Platform specific code up ahead |
| # |
| # The multiprocessing.Queue object does not tell us how |
| # to receive io events in the receiving process, so we |
| # need to sneak in and get its file descriptor. |
| # |
| # The _reader member of the Queue is currently private |
| # but well known, perhaps it will become public: |
| # |
| # http://bugs.python.org/issue3831 |
| # |
| if not self._listening: |
| # self._scheduler.loop.add_reader( |
| # self._queue._reader.fileno(), self._parent_recv) |
| self._listening = True |
| |
| # _parent_stop_listening() |
| # |
| # Stops listening on the message queue |
| # |
| def _parent_stop_listening(self): |
| if self._listening: |
| # self._scheduler.loop.remove_reader(self._queue._reader.fileno()) |
| self._listening = False |
| |
| |
| # ChildJob() |
| # |
| # The ChildJob object represents the part of a parallel task that will run in a |
| # separate process. It has a close relationship with the parent Job that |
| # created it. |
| # |
| # See the documentation of the Job class for more on their relationship, and |
| # how to set up a (Job, ChildJob pair). |
| # |
| # The args below are passed from the parent Job to the ChildJob. |
| # |
| # Args: |
| # scheduler (Scheduler): The scheduler. |
| # action_name (str): The queue action name. |
| # logfile (str): A template string that points to the logfile |
| # that should be used - should contain {pid}. |
| # max_retries (int): The maximum number of retries. |
| # tries (int): The number of retries so far. |
| # message_unique_id (int): None, or the id to be supplied to the Message() constructor. |
| # task_id (int): None, or the plugin identifier for this job. |
| # |
| class ChildJob(): |
| |
| def __init__( |
| self, action_name, messenger, logdir, logfile, max_retries, tries, message_unique_id, task_id): |
| |
| self.action_name = action_name |
| |
| self._messenger = messenger |
| self._logdir = logdir |
| self._logfile = logfile |
| self._max_retries = max_retries |
| self._tries = tries |
| self._message_unique_id = message_unique_id |
| self._task_id = task_id |
| |
| self._queue = None |
| |
| # message(): |
| # |
| # Logs a message, this will be logged in the task's logfile and |
| # conditionally also be sent to the frontend. |
| # |
| # Args: |
| # message_type (MessageType): The type of message to send |
| # message (str): The message |
| # kwargs: Remaining Message() constructor arguments, note that you can |
| # override 'unique_id' this way. |
| # |
| def message(self, message_type, message, **kwargs): |
| kwargs['scheduler'] = True |
| unique_id = self._message_unique_id |
| if "unique_id" in kwargs: |
| unique_id = kwargs["unique_id"] |
| del kwargs["unique_id"] |
| self._messenger.message( |
| Message(unique_id, message_type, message, **kwargs)) |
| |
| # send_message() |
| # |
| # Send data in a message to the parent Job, running in the main process. |
| # |
| # This allows for custom inter-process communication between subclasses of |
| # Job and ChildJob. |
| # |
| # These messages will be processed by the Job.handle_message() |
| # implementation, which may be overridden to support one or more custom |
| # 'message_type's. |
| # |
| # Args: |
| # message_data (any): A simple object (must be pickle-able, i.e. |
| # strings, lists, dicts, numbers, but not Element |
| # instances). This is sent to the parent Job. |
| # |
| def send_message(self, message_data): |
| self._send_message(_MessageType.SUBCLASS_CUSTOM_MESSAGE, message_data) |
| |
| ####################################################### |
| # Abstract Methods # |
| ####################################################### |
| |
| # child_process() |
| # |
| # This will be executed after starting the child process, and is intended |
| # to perform the job's task. |
| # |
| # Returns: |
| # (any): A simple object (must be pickle-able, i.e. strings, lists, |
| # dicts, numbers, but not Element instances). It is returned to |
| # the parent Job running in the main process. This is taken as |
| # the result of the Job. |
| # |
| def child_process(self): |
| raise ImplError("ChildJob '{kind}' does not implement child_process()" |
| .format(kind=type(self).__name__)) |
| |
| # child_process_data() |
| # |
| # Abstract method to retrieve additional data that should be |
| # returned to the parent process. Note that the job result is |
| # retrieved independently. |
| # |
| # Values can later be retrieved in Job.child_data. |
| # |
| # Returns: |
| # (dict) A dict containing values to be reported to the main process |
| # |
| def child_process_data(self): |
| return {} |
| |
| # child_action() |
| # |
| # Perform the action in the child process, this calls the action_cb. |
| # |
| # Args: |
| # queue (multiprocessing.Queue): The message queue for IPC |
| # |
| def child_action(self, queue): |
| |
| # This avoids some SIGTSTP signals from grandchildren |
| # getting propagated up to the master process |
| #os.setsid() |
| |
| # First set back to the default signal handlers for the signals |
| # we handle, and then clear their blocked state. |
| # |
| #signal_list = [signal.SIGTSTP, signal.SIGTERM] |
| #for sig in signal_list: |
| # signal.signal(sig, signal.SIG_DFL) |
| #signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list) |
| |
| # Assign the queue we passed across the process boundaries |
| # |
| # Set the global message handler in this child |
| # process to forward messages to the parent process |
| self._queue = queue |
| self._messenger.set_message_handler(self._child_message_handler) |
| |
| starttime = datetime.datetime.now() |
| stopped_time = None |
| |
| def stop_time(): |
| nonlocal stopped_time |
| stopped_time = datetime.datetime.now() |
| |
| def resume_time(): |
| nonlocal stopped_time |
| nonlocal starttime |
| starttime += (datetime.datetime.now() - stopped_time) |
| |
| # Time, log and and run the action function |
| # |
| with _signals.suspendable(stop_time, resume_time), \ |
| self._messenger.recorded_messages(self._logfile, self._logdir) as filename: |
| |
| self.message(MessageType.START, self.action_name, logfile=filename) |
| |
| try: |
| # Try the task action |
| result = self.child_process() # pylint: disable=assignment-from-no-return |
| except SkipJob as e: |
| elapsed = datetime.datetime.now() - starttime |
| self.message(MessageType.SKIPPED, str(e), |
| elapsed=elapsed, logfile=filename) |
| |
| # Alert parent of skip by return code |
| self._child_shutdown(_ReturnCode.SKIPPED) |
| except BstError as e: |
| elapsed = datetime.datetime.now() - starttime |
| retry_flag = e.temporary |
| |
| if retry_flag and (self._tries <= self._max_retries): |
| self.message(MessageType.FAIL, |
| "Try #{} failed, retrying".format(self._tries), |
| elapsed=elapsed, logfile=filename) |
| else: |
| self.message(MessageType.FAIL, str(e), |
| elapsed=elapsed, detail=e.detail, |
| logfile=filename, sandbox=e.sandbox) |
| |
| self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) |
| |
| # Report the exception to the parent (for internal testing purposes) |
| self._child_send_error(e) |
| |
| # Set return code based on whether or not the error was temporary. |
| # |
| self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) |
| |
| except Exception: # pylint: disable=broad-except |
| |
| # If an unhandled (not normalized to BstError) occurs, that's a bug, |
| # send the traceback and formatted exception back to the frontend |
| # and print it to the log file. |
| # |
| elapsed = datetime.datetime.now() - starttime |
| detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) |
| |
| self.message(MessageType.BUG, self.action_name, |
| elapsed=elapsed, detail=detail, |
| logfile=filename) |
| # Unhandled exceptions should permenantly fail |
| self._child_shutdown(_ReturnCode.PERM_FAIL) |
| |
| else: |
| # No exception occurred in the action |
| self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) |
| self._child_send_result(result) |
| |
| elapsed = datetime.datetime.now() - starttime |
| self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, |
| logfile=filename) |
| |
| # Shutdown needs to stay outside of the above context manager, |
| # make sure we dont try to handle SIGTERM while the process |
| # is already busy in sys.exit() |
| self._child_shutdown(_ReturnCode.OK) |
| |
| ####################################################### |
| # Local Private Methods # |
| ####################################################### |
| |
| # _send_message() |
| # |
| # Send data in a message to the parent Job, running in the main process. |
| # |
| # Args: |
| # message_type (str): The type of message to send. |
| # message_data (any): A simple object (must be pickle-able, i.e. |
| # strings, lists, dicts, numbers, but not Element |
| # instances). This is sent to the parent Job. |
| # |
| def _send_message(self, message_type, message_data): |
| self._queue.put(_Envelope(message_type, message_data)) |
| |
| # _child_send_error() |
| # |
| # Sends an error to the main process through the message queue |
| # |
| # Args: |
| # e (Exception): The error to send |
| # |
| def _child_send_error(self, e): |
| domain = None |
| reason = None |
| |
| if isinstance(e, BstError): |
| domain = e.domain |
| reason = e.reason |
| |
| self._send_message(_MessageType.ERROR, { |
| 'domain': domain, |
| 'reason': reason |
| }) |
| |
| # _child_send_result() |
| # |
| # Sends the serialized result to the main process through the message queue |
| # |
| # Args: |
| # result (any): None, or a simple object (must be pickle-able, i.e. |
| # strings, lists, dicts, numbers, but not Element |
| # instances). |
| # |
| # Note: If None is passed here, nothing needs to be sent, the |
| # result member in the parent process will simply remain None. |
| # |
| def _child_send_result(self, result): |
| if result is not None: |
| self._send_message(_MessageType.RESULT, result) |
| |
| # _child_shutdown() |
| # |
| # Shuts down the child process by cleaning up and exiting the process |
| # |
| # Args: |
| # exit_code (_ReturnCode): The exit code to exit with |
| # |
| def _child_shutdown(self, exit_code): |
| # self._queue.close() |
| assert isinstance(exit_code, _ReturnCode) |
| sys.exit(int(exit_code)) |
| |
| # _child_message_handler() |
| # |
| # A Context delegate for handling messages, this replaces the |
| # frontend's main message handler in the context of a child task |
| # and performs local logging to the local log file before sending |
| # the message back to the parent process for further propagation. |
| # |
| # Args: |
| # message (Message): The message to log |
| # is_silenced (bool) : Whether messages are silenced |
| # |
| def _child_message_handler(self, message, is_silenced): |
| |
| message.action_name = self.action_name |
| message.task_id = self._task_id |
| |
| # Send to frontend if appropriate |
| if is_silenced and (message.message_type not in unconditional_messages): |
| return |
| |
| if message.message_type == MessageType.LOG: |
| return |
| |
| self._send_message(_MessageType.LOG_MESSAGE, message) |