blob: d74d391436a93d91d13535aef66948ce0327eac3 [file] [log] [blame]
#
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Angelos Evripiotis <jevripiotis@bloomberg.net>
import os
import datetime
from contextlib import contextmanager
from . import _signals
from . import utils
from ._exceptions import BstError
from ._message import Message, MessageType
from .plugin import Plugin
class Messenger():
def __init__(self):
self._message_handler = None
self._silence_scope_depth = 0
self._log_handle = None
self._log_filename = None
# set_message_handler()
#
# Sets the handler for any status messages propagated through
# the context.
#
# The handler should have the signature:
#
# def handler(
# message: _message.Message, # The message to send.
# is_silenced: bool, # Whether messages are currently being silenced.
# ) -> None
#
def set_message_handler(self, handler):
self._message_handler = handler
# _silent_messages():
#
# Returns:
# (bool): Whether messages are currently being silenced
#
def _silent_messages(self):
return self._silence_scope_depth > 0
# message():
#
# Proxies a message back to the caller, this is the central
# point through which all messages pass.
#
# Args:
# message: A Message object
#
def message(self, message):
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
# Send it off to the log handler (can be the frontend,
# or it can be the child task which will propagate
# to the frontend)
assert self._message_handler
self._message_handler(message, is_silenced=self._silent_messages())
# silence()
#
# A context manager to silence messages, this behaves in
# the same way as the `silent_nested` argument of the
# timed_activity() context manager: all but
# _message.unconditional_messages will be silenced.
#
# Args:
# actually_silence (bool): Whether to actually do the silencing, if
# False then this context manager does not
# affect anything.
#
@contextmanager
def silence(self, *, actually_silence=True):
if not actually_silence:
yield
return
self._silence_scope_depth += 1
try:
yield
finally:
assert self._silence_scope_depth > 0
self._silence_scope_depth -= 1
# timed_activity()
#
# Context manager for performing timed activities and logging those
#
# Args:
# activity_name (str): The name of the activity
# context (Context): The invocation context object
# unique_id (int): Optionally, the unique id of the plugin related to the message
# detail (str): An optional detailed message, can be multiline output
# silent_nested (bool): If True, all but _message.unconditional_messages are silenced
#
@contextmanager
def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False):
starttime = datetime.datetime.now()
stopped_time = None
def stop_time():
nonlocal stopped_time
stopped_time = datetime.datetime.now()
def resume_time():
nonlocal stopped_time
nonlocal starttime
sleep_time = datetime.datetime.now() - stopped_time
starttime += sleep_time
with _signals.suspendable(stop_time, resume_time):
try:
# Push activity depth for status messages
message = Message(unique_id, MessageType.START, activity_name, detail=detail)
self.message(message)
with self.silence(actually_silence=silent_nested):
yield
except BstError:
# Note the failure in status messages and reraise, the scheduler
# expects an error when there is an error.
elapsed = datetime.datetime.now() - starttime
message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
self.message(message)
raise
elapsed = datetime.datetime.now() - starttime
message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
self.message(message)
# recorded_messages()
#
# Records all messages in a log file while the context manager
# is active.
#
# In addition to automatically writing all messages to the
# specified logging file, an open file handle for process stdout
# and stderr will be available via the Messenger.get_log_handle() API,
# and the full logfile path will be available via the
# Messenger.get_log_filename() API.
#
# Args:
# filename (str): A logging directory relative filename,
# the pid and .log extension will be automatically
# appended
#
# logdir (str) : The path to the log file directory.
#
# Yields:
# (str): The fully qualified log filename
#
@contextmanager
def recorded_messages(self, filename, logdir):
# We dont allow recursing in this context manager, and
# we also do not allow it in the main process.
assert self._log_handle is None
assert self._log_filename is None
# Need to deal with global _main_pid var.
# assert not utils._is_main_process()
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
self._log_filename = os.path.join(logdir,
'{}.{}.log'.format(filename, os.getpid()))
# Ensure the directory exists first
directory = os.path.dirname(self._log_filename)
os.makedirs(directory, exist_ok=True)
with open(self._log_filename, 'a') as logfile:
# Write one last line to the log and flush it to disk
def flush_log():
# If the process currently had something happening in the I/O stack
# then trying to reenter the I/O stack will fire a runtime error.
#
# So just try to flush as well as we can at SIGTERM time
try:
logfile.write('\n\nForcefully terminated\n')
logfile.flush()
except RuntimeError:
os.fsync(logfile.fileno())
self._log_handle = logfile
with _signals.terminator(flush_log):
yield self._log_filename
self._log_handle = None
self._log_filename = None
# get_log_handle()
#
# Fetches the active log handle, this will return the active
# log file handle when the Messenger.recorded_messages() context
# manager is active
#
# Returns:
# (file): The active logging file handle, or None
#
def get_log_handle(self):
return self._log_handle
# get_log_filename()
#
# Fetches the active log filename, this will return the active
# log filename when the Messenger.recorded_messages() context
# manager is active
#
# Returns:
# (str): The active logging filename, or None
#
def get_log_filename(self):
return self._log_filename
# _record_message()
#
# Records the message if recording is enabled
#
# Args:
# message (Message): The message to record
#
def _record_message(self, message):
if self._log_handle is None:
return
INDENT = " "
EMPTYTIME = "--:--:--"
template = "[{timecode: <8}] {type: <7}"
# If this message is associated with a plugin, print what
# we know about the plugin.
plugin_name = ""
# if message.unique_id:
# template += " {plugin}"
# plugin = Plugin._lookup(message.unique_id)
# plugin_name = plugin.name
template += ": {message}"
detail = ''
if message.detail is not None:
template += "\n\n{detail}"
detail = message.detail.rstrip('\n')
detail = INDENT + INDENT.join(detail.splitlines(True))
timecode = EMPTYTIME
if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
minutes, seconds = divmod(remainder, 60)
timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
text = template.format(timecode=timecode,
plugin=plugin_name,
type=message.message_type.upper(),
message=message.message,
detail=detail)
# Write to the open log file
self._log_handle.write('{}\n'.format(text))
self._log_handle.flush()