| # |
| # Copyright (C) 2019 Bloomberg Finance LP |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this library. If not, see <http://www.gnu.org/licenses/>. |
| # |
| # Authors: |
| # Angelos Evripiotis <jevripiotis@bloomberg.net> |
| |
| import os |
| import datetime |
| from contextlib import contextmanager |
| |
| from . import _signals |
| from . import utils |
| from ._exceptions import BstError |
| from ._message import Message, MessageType |
| |
| |
| _RENDER_INTERVAL = datetime.timedelta(seconds=1) |
| |
| |
| # Time in seconds for which we decide that we want to display subtask information |
| _DISPLAY_LIMIT = datetime.timedelta(seconds=3) |
| # If we're in the test suite, we need to ensure that we don't set a limit |
| if "BST_TEST_SUITE" in os.environ: |
| _DISPLAY_LIMIT = datetime.timedelta(seconds=0) |
| |
| |
| # TimeData class to contain times in an object that can be passed around |
| # and updated from different places |
| class _TimeData: |
| __slots__ = ["start_time"] |
| |
| def __init__(self, start_time): |
| self.start_time = start_time |
| |
| |
| class Messenger: |
| def __init__(self): |
| self._message_handler = None |
| self._silence_scope_depth = 0 |
| self._log_handle = None |
| self._log_filename = None |
| self._state = None |
| self._next_render = None # A Time object |
| self._active_simple_tasks = 0 |
| self._render_status_cb = None |
| |
| # set_message_handler() |
| # |
| # Sets the handler for any status messages propagated through |
| # the context. |
| # |
| # The handler should have the signature: |
| # |
| # def handler( |
| # message: _message.Message, # The message to send. |
| # is_silenced: bool, # Whether messages are currently being silenced. |
| # ) -> None |
| # |
| def set_message_handler(self, handler): |
| self._message_handler = handler |
| |
| # set_state() |
| # |
| # Sets the State object within the Messenger |
| # |
| # Args: |
| # state (State): The state to set |
| # |
| def set_state(self, state): |
| self._state = state |
| |
| # set_render_status_cb() |
| # |
| # Sets the callback to use to render status |
| # |
| # Args: |
| # callback (function): The Callback to be notified |
| # |
| # Callback Args: |
| # There are no arguments to the callback |
| # |
| def set_render_status_cb(self, callback): |
| self._render_status_cb = callback |
| |
| # _silent_messages(): |
| # |
| # Returns: |
| # (bool): Whether messages are currently being silenced |
| # |
| def _silent_messages(self): |
| return self._silence_scope_depth > 0 |
| |
| # message(): |
| # |
| # Proxies a message back to the caller, this is the central |
| # point through which all messages pass. |
| # |
| # Args: |
| # message: A Message object |
| # |
| def message(self, message): |
| |
| # If we are recording messages, dump a copy into the open log file. |
| self._record_message(message) |
| |
| # Send it off to the log handler (can be the frontend, |
| # or it can be the child task which will propagate |
| # to the frontend) |
| assert self._message_handler |
| |
| self._message_handler(message, is_silenced=self._silent_messages()) |
| |
| # silence() |
| # |
| # A context manager to silence messages, this behaves in |
| # the same way as the `silent_nested` argument of the |
| # timed_activity() context manager: all but |
| # _message.unconditional_messages will be silenced. |
| # |
| # Args: |
| # actually_silence (bool): Whether to actually do the silencing, if |
| # False then this context manager does not |
| # affect anything. |
| # |
| @contextmanager |
| def silence(self, *, actually_silence=True): |
| if not actually_silence: |
| yield |
| return |
| |
| self._silence_scope_depth += 1 |
| try: |
| yield |
| finally: |
| assert self._silence_scope_depth > 0 |
| self._silence_scope_depth -= 1 |
| |
| # timed_activity() |
| # |
| # Context manager for performing timed activities and logging those |
| # |
| # Args: |
| # activity_name (str): The name of the activity |
| # element_name (str): Optionally, the element full name of the plugin related to the message |
| # detail (str): An optional detailed message, can be multiline output |
| # silent_nested (bool): If True, all but _message.unconditional_messages are silenced |
| # |
| @contextmanager |
| def timed_activity(self, activity_name, *, element_name=None, detail=None, silent_nested=False): |
| with self._timed_suspendable() as timedata: |
| try: |
| # Push activity depth for status messages |
| message = Message(MessageType.START, activity_name, detail=detail, element_name=element_name) |
| self.message(message) |
| with self.silence(actually_silence=silent_nested): |
| yield |
| |
| except BstError: |
| # Note the failure in status messages and reraise, the scheduler |
| # expects an error when there is an error. |
| elapsed = datetime.datetime.now() - timedata.start_time |
| message = Message(MessageType.FAIL, activity_name, elapsed=elapsed, element_name=element_name) |
| self.message(message) |
| raise |
| |
| elapsed = datetime.datetime.now() - timedata.start_time |
| message = Message(MessageType.SUCCESS, activity_name, elapsed=elapsed, element_name=element_name) |
| self.message(message) |
| |
| # simple_task() |
| # |
| # Context manager for creating a task to report progress to. |
| # |
| # Args: |
| # activity_name (str): The name of the activity |
| # element_name (str): Optionally, the element full name of the plugin related to the message |
| # full_name (str): Optionally, the distinguishing name of the activity, e.g. element name |
| # silent_nested (bool): If True, all but _message.unconditional_messages are silenced |
| # |
| # Yields: |
| # Task: A Task object that represents this activity, principally used to report progress |
| # |
| @contextmanager |
| def simple_task(self, activity_name, *, element_name=None, full_name=None, silent_nested=False): |
| # Bypass use of State when none exists (e.g. tests) |
| if not self._state: |
| with self.timed_activity(activity_name, element_name=element_name, silent_nested=silent_nested): |
| yield |
| return |
| |
| if not full_name: |
| full_name = activity_name |
| |
| with self._timed_suspendable() as timedata: |
| try: |
| message = Message(MessageType.START, activity_name, element_name=element_name) |
| self.message(message) |
| |
| task = self._state.add_task(activity_name, full_name) |
| task.set_render_cb(self._render_status) |
| self._active_simple_tasks += 1 |
| if not self._next_render: |
| self._next_render = datetime.datetime.now() + _RENDER_INTERVAL |
| |
| with self.silence(actually_silence=silent_nested): |
| yield task |
| |
| except BstError: |
| elapsed = datetime.datetime.now() - timedata.start_time |
| message = Message(MessageType.FAIL, activity_name, elapsed=elapsed, element_name=element_name) |
| self.message(message) |
| raise |
| finally: |
| self._state.remove_task(activity_name, full_name) |
| self._active_simple_tasks -= 1 |
| if self._active_simple_tasks == 0: |
| self._next_render = None |
| |
| elapsed = datetime.datetime.now() - timedata.start_time |
| detail = None |
| |
| if task.current_progress is not None and elapsed > _DISPLAY_LIMIT: |
| if task.maximum_progress is not None: |
| detail = "{} of {} subtasks processed".format(task.current_progress, task.maximum_progress) |
| else: |
| detail = "{} subtasks processed".format(task.current_progress) |
| message = Message( |
| MessageType.SUCCESS, activity_name, elapsed=elapsed, detail=detail, element_name=element_name |
| ) |
| self.message(message) |
| |
| # recorded_messages() |
| # |
| # Records all messages in a log file while the context manager |
| # is active. |
| # |
| # In addition to automatically writing all messages to the |
| # specified logging file, an open file handle for process stdout |
| # and stderr will be available via the Messenger.get_log_handle() API, |
| # and the full logfile path will be available via the |
| # Messenger.get_log_filename() API. |
| # |
| # Args: |
| # filename (str): A logging directory relative filename, |
| # the pid and .log extension will be automatically |
| # appended |
| # |
| # logdir (str) : The path to the log file directory. |
| # |
| # Yields: |
| # (str): The fully qualified log filename |
| # |
| @contextmanager |
| def recorded_messages(self, filename, logdir): |
| |
| # We dont allow recursing in this context manager, and |
| # we also do not allow it in the main process. |
| assert self._log_handle is None |
| assert self._log_filename is None |
| assert not utils._is_main_process() |
| |
| # Create the fully qualified logfile in the log directory, |
| # appending the pid and .log extension at the end. |
| self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) |
| |
| # Ensure the directory exists first |
| directory = os.path.dirname(self._log_filename) |
| os.makedirs(directory, exist_ok=True) |
| |
| with open(self._log_filename, "a") as logfile: |
| |
| # Write one last line to the log and flush it to disk |
| def flush_log(): |
| |
| # If the process currently had something happening in the I/O stack |
| # then trying to reenter the I/O stack will fire a runtime error. |
| # |
| # So just try to flush as well as we can at SIGTERM time |
| try: |
| logfile.write("\n\nForcefully terminated\n") |
| logfile.flush() |
| except RuntimeError: |
| os.fsync(logfile.fileno()) |
| |
| self._log_handle = logfile |
| with _signals.terminator(flush_log): |
| yield self._log_filename |
| |
| self._log_handle = None |
| self._log_filename = None |
| |
| # get_log_handle() |
| # |
| # Fetches the active log handle, this will return the active |
| # log file handle when the Messenger.recorded_messages() context |
| # manager is active |
| # |
| # Returns: |
| # (file): The active logging file handle, or None |
| # |
| def get_log_handle(self): |
| return self._log_handle |
| |
| # get_log_filename() |
| # |
| # Fetches the active log filename, this will return the active |
| # log filename when the Messenger.recorded_messages() context |
| # manager is active |
| # |
| # Returns: |
| # (str): The active logging filename, or None |
| # |
| def get_log_filename(self): |
| return self._log_filename |
| |
| # _record_message() |
| # |
| # Records the message if recording is enabled |
| # |
| # Args: |
| # message (Message): The message to record |
| # |
| def _record_message(self, message): |
| |
| if self._log_handle is None: |
| return |
| |
| INDENT = " " |
| EMPTYTIME = "--:--:--" |
| template = "[{timecode: <8}] {type: <7}" |
| |
| # If this message is associated with an element or source plugin, print the |
| # full element name of the instance. |
| element_name = "" |
| if message.element_name: |
| template += " {element_name}" |
| element_name = message.element_name |
| |
| template += ": {message}" |
| |
| detail = "" |
| if message.detail is not None: |
| template += "\n\n{detail}" |
| detail = message.detail.rstrip("\n") |
| detail = INDENT + INDENT.join(detail.splitlines(True)) |
| |
| timecode = EMPTYTIME |
| if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): |
| hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2) |
| minutes, seconds = divmod(remainder, 60) |
| timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) |
| |
| text = template.format( |
| timecode=timecode, |
| element_name=element_name, |
| type=message.message_type.upper(), |
| message=message.message, |
| detail=detail, |
| ) |
| |
| # Write to the open log file |
| self._log_handle.write("{}\n".format(text)) |
| self._log_handle.flush() |
| |
| # get_state_for_child_job_pickling(self) |
| # |
| # Return data necessary to reconstruct this object in a child job process. |
| # |
| # This should be implemented the same as __getstate__(). We define this |
| # method instead as it is child job specific. |
| # |
| # Returns: |
| # (dict): This `state` is what we want `self.__dict__` to be restored to |
| # after instantiation in the child process. |
| # |
| def get_state_for_child_job_pickling(self): |
| state = self.__dict__.copy() |
| |
| # When pickling a Messenger over to the ChildJob, we don't want to bring |
| # the whole _message_handler over with it. We also don't want to remove it |
| # in the main process. If we remove it in the child process then we will |
| # already be too late. The only time that seems just right is here, when |
| # preparing the child process' copy of the Messenger. |
| # |
| # Another approach might be to use a context manager on the Messenger, |
| # which removes and restores the _message_handler. This wouldn't require |
| # access to private details of Messenger, but it would open up a window |
| # where messagesw wouldn't be handled as expected. |
| # |
| del state["_message_handler"] |
| |
| # The render status callback is only used in the main process |
| # |
| del state["_render_status_cb"] |
| |
| # The "simple_task" context manager is not needed outside the main |
| # process. During testing we override it to something that cannot |
| # pickle, so just drop it when pickling to a child job. Note that it |
| # will only appear in 'state' if it has been overridden. |
| # |
| state.pop("simple_task", None) |
| |
| # The State object is not needed outside the main process |
| del state["_state"] |
| |
| return state |
| |
| # _render_status() |
| # |
| # Calls the render status callback set in the messenger, but only if a |
| # second has passed since it last rendered. |
| # |
| def _render_status(self): |
| assert self._next_render |
| |
| # self._render_status_cb() |
| now = datetime.datetime.now() |
| if self._render_status_cb and now >= self._next_render: |
| self._render_status_cb() |
| self._next_render = now + _RENDER_INTERVAL |
| |
| # _timed_suspendable() |
| # |
| # A contextmanager that allows an activity to be suspended and can |
| # adjust for clock drift caused by suspending |
| # |
| # Yields: |
| # TimeData: An object that contains the time the activity started |
| # |
| @contextmanager |
| def _timed_suspendable(self): |
| # Note: timedata needs to be in a namedtuple so that values can be |
| # yielded that will change |
| timedata = _TimeData(start_time=datetime.datetime.now()) |
| stopped_time = None |
| |
| def stop_time(): |
| nonlocal stopped_time |
| stopped_time = datetime.datetime.now() |
| |
| def resume_time(): |
| nonlocal timedata |
| nonlocal stopped_time |
| sleep_time = datetime.datetime.now() - stopped_time |
| timedata.start_time += sleep_time |
| |
| with _signals.suspendable(stop_time, resume_time): |
| yield timedata |