blob: 03b2833ec25211258dc47c97d9d56c73366082f4 [file] [log] [blame]
#
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Angelos Evripiotis <jevripiotis@bloomberg.net>
import os
import datetime
from contextlib import contextmanager
from . import _signals
from . import utils
from ._exceptions import BstError
from ._message import Message, MessageType
_RENDER_INTERVAL = datetime.timedelta(seconds=1)
# Time in seconds for which we decide that we want to display subtask information
_DISPLAY_LIMIT = datetime.timedelta(seconds=3)
# If we're in the test suite, we need to ensure that we don't set a limit
if "BST_TEST_SUITE" in os.environ:
_DISPLAY_LIMIT = datetime.timedelta(seconds=0)
# TimeData class to contain times in an object that can be passed around
# and updated from different places
class _TimeData:
__slots__ = ["start_time"]
def __init__(self, start_time):
self.start_time = start_time
class Messenger:
def __init__(self):
self._message_handler = None
self._silence_scope_depth = 0
self._log_handle = None
self._log_filename = None
self._state = None
self._next_render = None # A Time object
self._active_simple_tasks = 0
self._render_status_cb = None
# set_message_handler()
#
# Sets the handler for any status messages propagated through
# the context.
#
# The handler should have the signature:
#
# def handler(
# message: _message.Message, # The message to send.
# is_silenced: bool, # Whether messages are currently being silenced.
# ) -> None
#
def set_message_handler(self, handler):
self._message_handler = handler
# set_state()
#
# Sets the State object within the Messenger
#
# Args:
# state (State): The state to set
#
def set_state(self, state):
self._state = state
# set_render_status_cb()
#
# Sets the callback to use to render status
#
# Args:
# callback (function): The Callback to be notified
#
# Callback Args:
# There are no arguments to the callback
#
def set_render_status_cb(self, callback):
self._render_status_cb = callback
# _silent_messages():
#
# Returns:
# (bool): Whether messages are currently being silenced
#
def _silent_messages(self):
return self._silence_scope_depth > 0
# message():
#
# Proxies a message back to the caller, this is the central
# point through which all messages pass.
#
# Args:
# message: A Message object
#
def message(self, message):
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
# Send it off to the log handler (can be the frontend,
# or it can be the child task which will propagate
# to the frontend)
assert self._message_handler
self._message_handler(message, is_silenced=self._silent_messages())
# silence()
#
# A context manager to silence messages, this behaves in
# the same way as the `silent_nested` argument of the
# timed_activity() context manager: all but
# _message.unconditional_messages will be silenced.
#
# Args:
# actually_silence (bool): Whether to actually do the silencing, if
# False then this context manager does not
# affect anything.
#
@contextmanager
def silence(self, *, actually_silence=True):
if not actually_silence:
yield
return
self._silence_scope_depth += 1
try:
yield
finally:
assert self._silence_scope_depth > 0
self._silence_scope_depth -= 1
# timed_activity()
#
# Context manager for performing timed activities and logging those
#
# Args:
# activity_name (str): The name of the activity
# element_name (str): Optionally, the element full name of the plugin related to the message
# detail (str): An optional detailed message, can be multiline output
# silent_nested (bool): If True, all but _message.unconditional_messages are silenced
#
@contextmanager
def timed_activity(self, activity_name, *, element_name=None, detail=None, silent_nested=False):
with self._timed_suspendable() as timedata:
try:
# Push activity depth for status messages
message = Message(MessageType.START, activity_name, detail=detail, element_name=element_name)
self.message(message)
with self.silence(actually_silence=silent_nested):
yield
except BstError:
# Note the failure in status messages and reraise, the scheduler
# expects an error when there is an error.
elapsed = datetime.datetime.now() - timedata.start_time
message = Message(MessageType.FAIL, activity_name, elapsed=elapsed, element_name=element_name)
self.message(message)
raise
elapsed = datetime.datetime.now() - timedata.start_time
message = Message(MessageType.SUCCESS, activity_name, elapsed=elapsed, element_name=element_name)
self.message(message)
# simple_task()
#
# Context manager for creating a task to report progress to.
#
# Args:
# activity_name (str): The name of the activity
# element_name (str): Optionally, the element full name of the plugin related to the message
# full_name (str): Optionally, the distinguishing name of the activity, e.g. element name
# silent_nested (bool): If True, all but _message.unconditional_messages are silenced
#
# Yields:
# Task: A Task object that represents this activity, principally used to report progress
#
@contextmanager
def simple_task(self, activity_name, *, element_name=None, full_name=None, silent_nested=False):
# Bypass use of State when none exists (e.g. tests)
if not self._state:
with self.timed_activity(activity_name, element_name=element_name, silent_nested=silent_nested):
yield
return
if not full_name:
full_name = activity_name
with self._timed_suspendable() as timedata:
try:
message = Message(MessageType.START, activity_name, element_name=element_name)
self.message(message)
task = self._state.add_task(activity_name, full_name)
task.set_render_cb(self._render_status)
self._active_simple_tasks += 1
if not self._next_render:
self._next_render = datetime.datetime.now() + _RENDER_INTERVAL
with self.silence(actually_silence=silent_nested):
yield task
except BstError:
elapsed = datetime.datetime.now() - timedata.start_time
message = Message(MessageType.FAIL, activity_name, elapsed=elapsed, element_name=element_name)
self.message(message)
raise
finally:
self._state.remove_task(activity_name, full_name)
self._active_simple_tasks -= 1
if self._active_simple_tasks == 0:
self._next_render = None
elapsed = datetime.datetime.now() - timedata.start_time
detail = None
if task.current_progress is not None and elapsed > _DISPLAY_LIMIT:
if task.maximum_progress is not None:
detail = "{} of {} subtasks processed".format(task.current_progress, task.maximum_progress)
else:
detail = "{} subtasks processed".format(task.current_progress)
message = Message(
MessageType.SUCCESS, activity_name, elapsed=elapsed, detail=detail, element_name=element_name
)
self.message(message)
# recorded_messages()
#
# Records all messages in a log file while the context manager
# is active.
#
# In addition to automatically writing all messages to the
# specified logging file, an open file handle for process stdout
# and stderr will be available via the Messenger.get_log_handle() API,
# and the full logfile path will be available via the
# Messenger.get_log_filename() API.
#
# Args:
# filename (str): A logging directory relative filename,
# the pid and .log extension will be automatically
# appended
#
# logdir (str) : The path to the log file directory.
#
# Yields:
# (str): The fully qualified log filename
#
@contextmanager
def recorded_messages(self, filename, logdir):
# We dont allow recursing in this context manager, and
# we also do not allow it in the main process.
assert self._log_handle is None
assert self._log_filename is None
assert not utils._is_main_process()
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
# Ensure the directory exists first
directory = os.path.dirname(self._log_filename)
os.makedirs(directory, exist_ok=True)
with open(self._log_filename, "a") as logfile:
# Write one last line to the log and flush it to disk
def flush_log():
# If the process currently had something happening in the I/O stack
# then trying to reenter the I/O stack will fire a runtime error.
#
# So just try to flush as well as we can at SIGTERM time
try:
logfile.write("\n\nForcefully terminated\n")
logfile.flush()
except RuntimeError:
os.fsync(logfile.fileno())
self._log_handle = logfile
with _signals.terminator(flush_log):
yield self._log_filename
self._log_handle = None
self._log_filename = None
# get_log_handle()
#
# Fetches the active log handle, this will return the active
# log file handle when the Messenger.recorded_messages() context
# manager is active
#
# Returns:
# (file): The active logging file handle, or None
#
def get_log_handle(self):
return self._log_handle
# get_log_filename()
#
# Fetches the active log filename, this will return the active
# log filename when the Messenger.recorded_messages() context
# manager is active
#
# Returns:
# (str): The active logging filename, or None
#
def get_log_filename(self):
return self._log_filename
# _record_message()
#
# Records the message if recording is enabled
#
# Args:
# message (Message): The message to record
#
def _record_message(self, message):
if self._log_handle is None:
return
INDENT = " "
EMPTYTIME = "--:--:--"
template = "[{timecode: <8}] {type: <7}"
# If this message is associated with an element or source plugin, print the
# full element name of the instance.
element_name = ""
if message.element_name:
template += " {element_name}"
element_name = message.element_name
template += ": {message}"
detail = ""
if message.detail is not None:
template += "\n\n{detail}"
detail = message.detail.rstrip("\n")
detail = INDENT + INDENT.join(detail.splitlines(True))
timecode = EMPTYTIME
if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
minutes, seconds = divmod(remainder, 60)
timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
text = template.format(
timecode=timecode,
element_name=element_name,
type=message.message_type.upper(),
message=message.message,
detail=detail,
)
# Write to the open log file
self._log_handle.write("{}\n".format(text))
self._log_handle.flush()
# get_state_for_child_job_pickling(self)
#
# Return data necessary to reconstruct this object in a child job process.
#
# This should be implemented the same as __getstate__(). We define this
# method instead as it is child job specific.
#
# Returns:
# (dict): This `state` is what we want `self.__dict__` to be restored to
# after instantiation in the child process.
#
def get_state_for_child_job_pickling(self):
state = self.__dict__.copy()
# When pickling a Messenger over to the ChildJob, we don't want to bring
# the whole _message_handler over with it. We also don't want to remove it
# in the main process. If we remove it in the child process then we will
# already be too late. The only time that seems just right is here, when
# preparing the child process' copy of the Messenger.
#
# Another approach might be to use a context manager on the Messenger,
# which removes and restores the _message_handler. This wouldn't require
# access to private details of Messenger, but it would open up a window
# where messagesw wouldn't be handled as expected.
#
del state["_message_handler"]
# The render status callback is only used in the main process
#
del state["_render_status_cb"]
# The "simple_task" context manager is not needed outside the main
# process. During testing we override it to something that cannot
# pickle, so just drop it when pickling to a child job. Note that it
# will only appear in 'state' if it has been overridden.
#
state.pop("simple_task", None)
# The State object is not needed outside the main process
del state["_state"]
return state
# _render_status()
#
# Calls the render status callback set in the messenger, but only if a
# second has passed since it last rendered.
#
def _render_status(self):
assert self._next_render
# self._render_status_cb()
now = datetime.datetime.now()
if self._render_status_cb and now >= self._next_render:
self._render_status_cb()
self._next_render = now + _RENDER_INTERVAL
# _timed_suspendable()
#
# A contextmanager that allows an activity to be suspended and can
# adjust for clock drift caused by suspending
#
# Yields:
# TimeData: An object that contains the time the activity started
#
@contextmanager
def _timed_suspendable(self):
# Note: timedata needs to be in a namedtuple so that values can be
# yielded that will change
timedata = _TimeData(start_time=datetime.datetime.now())
stopped_time = None
def stop_time():
nonlocal stopped_time
stopped_time = datetime.datetime.now()
def resume_time():
nonlocal timedata
nonlocal stopped_time
sleep_time = datetime.datetime.now() - stopped_time
timedata.start_time += sleep_time
with _signals.suspendable(stop_time, resume_time):
yield timedata