blob: ddcbb09b9213ac7c646d7cf09b5a369013bbfa1f [file] [log] [blame]
#
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
import datetime
from typing import Optional, Tuple, List, Dict, Callable
from .types import _DisplayKey
# TaskGroup
#
# The state data stored for a group of tasks (usually scheduler queues)
#
# Args:
# name: The name of the Task Group, e.g. 'build'
# state: The state object
# complete_name: Optional name for frontend status rendering, e.g. 'built'
#
class TaskGroup:
def __init__(self, name: str, state: "State", complete_name: Optional[str] = None) -> None:
#
# Public members
#
self.name: str = name # The name of tasks in this group
self.complete_name: Optional[str] = complete_name # Optional name for frontend status rendering, e.g. 'built'
self.processed_tasks: int = 0 # Number of processed tasks
self.skipped_tasks: int = 0 # Number of skipped tasks
self.failed_tasks: List[str] = [] # List of element full names which failed
#
# Private members
#
self._state: "State" = state
###########################################
# Core-facing APIs to drive notifications #
###########################################
# add_processed_task()
#
# Update the TaskGroup's count of processed tasks and notify of changes
#
# This is a core-facing API and should not be called from the frontend
#
def add_processed_task(self) -> None:
self.processed_tasks += 1
for cb in self._state._task_groups_changed_cbs:
cb()
# add_skipped_task()
#
# Update the TaskGroup's count of skipped tasks and notify of changes
#
# This is a core-facing API and should not be called from the frontend
#
def add_skipped_task(self) -> None:
self.skipped_tasks += 1
for cb in self._state._task_groups_changed_cbs:
cb()
# add_failed_task()
#
# Update the TaskGroup's list of failed tasks and notify of changes
#
# Args:
# full_name: The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
#
# This is a core-facing API and should not be called from the frontend
#
def add_failed_task(self, full_name: str) -> None:
self.failed_tasks.append(full_name)
for cb in self._state._task_groups_changed_cbs:
cb()
# Task
#
# The state data stored for an individual task
#
# Args:
# state: The State object
# task_id: The unique identifier of the task
# action_name: The name of the action, e.g. 'build'
# full_name: The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# elapsed_offset: The time the task started, relative to
# buildstream's start time.
class Task:
def __init__(
self, state: "State", task_id: str, action_name: str, full_name: str, elapsed_offset: datetime.timedelta
) -> None:
#
# Public members
#
self.id: str = task_id
self.action_name: str = action_name
self.full_name: str = full_name
self.elapsed_offset: datetime.timedelta = elapsed_offset
self.current_progress: Optional[int] = None
self.maximum_progress: Optional[int] = None
#
# Private members
#
self._state: "State" = state
self._task_changed_cb: Optional[Callable[[], None]] = None # Callback to call when something could be rendered
##############################################
# Core-facing APIs for driving notifications #
##############################################
# set_task_changed_callback()
#
# Sets the callback to be called when this task has
# changed.
#
# This is just a convenience codepath for the Messenger object
# run simple tasks outside of the scheduler context, rather
# than connecting to the State callbacks which are there for the
# purpose of the frontend to get notifications of task progress.
#
# Args:
# callback: The callback to call when progress changed
#
def set_task_changed_callback(self, callback: Optional[Callable[[], None]]) -> None:
self._task_changed_cb = callback
# set_maximum_progress()
#
# Sets the maximum progress possible for this task.
#
# Args:
# progress: The maximum progress possible for this task
#
def set_maximum_progress(self, progress: int) -> None:
self.maximum_progress = progress
self._notify_task_changed()
# set_current_progress()
#
# Sets the current progress of the task, this should
# be a number between 0 and the maximum progress, if a
# maximum progress has been set.
#
# Args:
# progress: The current progress
#
def set_current_progress(self, progress: int) -> None:
self.current_progress = progress
self._notify_task_changed()
# add_current_progress()
#
# A convenience function for incrementing the current
# progress of this task by 1.
#
def add_current_progress(self) -> None:
if self.current_progress is None:
new_progress = 1
else:
new_progress = self.current_progress + 1
self.set_current_progress(new_progress)
##############################################
# Private methods #
##############################################
def _notify_task_changed(self) -> None:
for cb in self._state._task_changed_cbs:
cb(self.id)
if self._task_changed_cb:
self._task_changed_cb()
# State
#
# The state data that is stored for the purpose of sharing with the frontend.
#
# BuildStream's Core is responsible for making changes to this data.
# BuildStream's Frontend may register callbacks with State to be notified
# when parts of State change, and read State to know what has changed.
#
# Args:
# session_start: The time the session started
#
class State:
def __init__(self, session_start: datetime.datetime) -> None:
#
# Public members
#
self.task_groups: Dict[str, TaskGroup] = {} # Dictionary of active task groups by group name
self.tasks: Dict[str, Task] = {} # Dictionary of active tasks by unique task ID
#
# Private members
#
self._session_start: datetime.datetime = session_start
self._task_added_cbs: List[Callable[[str], None]] = []
self._task_removed_cbs: List[Callable[[str], None]] = []
self._task_changed_cbs: List[Callable[[str], None]] = []
self._task_failed_cbs: List[Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]] = []
self._task_groups_changed_cbs: List[Callable[[], None]] = []
#####################################
# Frontend-facing notification APIs #
#####################################
# register_task_added_callback()
#
# Registers a callback to be notified when a task is added
#
# Args:
# callback: The callback to be notified
#
# Callback Args:
# task_id: The unique identifier of the task
#
def register_task_added_callback(self, callback: Callable[[str], None]) -> None:
self._task_added_cbs.append(callback)
# unregister_task_added_callback()
#
# Unregisters a callback previously registered by
# register_task_added_callback()
#
# Args:
# callback: The callback to be removed
#
def unregister_task_added_callback(self, callback: Callable[[str], None]) -> None:
self._task_added_cbs.remove(callback)
# register_task_removed_callback()
#
# Registers a callback to be notified when a task is removed
#
# Args:
# callback: The callback to be notified
#
# Callback Args:
# task_id: The unique identifier of the task
#
def register_task_removed_callback(self, callback: Callable[[str], None]) -> None:
self._task_removed_cbs.append(callback)
# unregister_task_removed_callback()
#
# Unregisters a callback previously registered by
# register_task_removed_callback()
#
# Args:
# callback: The callback to be notified
#
def unregister_task_removed_callback(self, callback: Callable[[str], None]) -> None:
self._task_removed_cbs.remove(callback)
# register_task_changed_callback()
#
# Register a callback to be notified when a task has changed
#
# Args:
# callback: The callback to be notified
#
# Callback Args:
# task_id: The unique identifier of the task
#
def register_task_changed_callback(self, callback: Callable[[str], None]) -> None:
self._task_changed_cbs.append(callback)
# unregister_task_changed_callback()
#
# Unregisters a callback previously registered by
# register_task_changed_callback()
#
# Args:
# callback: The callback to be notified
#
def unregister_task_changed_callback(self, callback: Callable[[str], None]) -> None:
self._task_changed_cbs.remove(callback)
# register_task_failed_callback()
#
# Registers a callback to be notified when a task has failed
#
# Args:
# callback (function): The callback to be notified
#
# Callback Args:
# task_id: The unique identifier of the task
# element: (optionally) The element unique_id and DisplayKey of an element job
#
def register_task_failed_callback(
self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]
) -> None:
self._task_failed_cbs.append(callback)
# unregister_task_failed_callback()
#
# Unregisters a callback previously registered by
# register_task_failed_callback()
#
# Args:
# callback (function): The callback to be removed
#
def unregister_task_failed_callback(
self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]
) -> None:
self._task_failed_cbs.remove(callback)
# register_task_groups_changed_callback()
#
# Registers a callback to be notified whenever the task groups info has changed
#
# Args:
# callback: The callback to be notified
#
# Callback Args:
# task_id: The unique identifier of the task
# element: (optionally) The element unique_id and DisplayKey of an element job
#
def register_task_groups_changed_callback(self, callback: Callable[[], None]) -> None:
self._task_groups_changed_cbs.append(callback)
# unregister_task_groups_changed_callback()
#
# Unregisters a callback previously registered by register_task_groups_changed_callback()
#
# Args:
# callback (function): The callback to be removed
#
def unregister_task_groups_changed_callback(self, callback: Callable[[], None]) -> None:
self._task_groups_changed_cbs.remove(callback)
##############################################
# Core-facing APIs for driving notifications #
##############################################
# add_task_group()
#
# Notification that a new task group has been added
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# name (str): The name of the task group, e.g. 'build'
# complete_name (str): Optional name to be used for frontend status rendering, e.g. 'built'
#
# Returns:
# TaskGroup: The task group created
#
def add_task_group(self, name, complete_name=None) -> TaskGroup:
assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups)
group = TaskGroup(name, self, complete_name)
self.task_groups[name] = group
return group
# remove_task_group()
#
# Notification that a task group has been removed
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# name (str): The name of the task group, e.g. 'build'
#
def remove_task_group(self, name) -> None:
# Rely on 'del' to raise an error when removing nonexistent task groups
del self.task_groups[name]
# add_task()
#
# Add a task and send appropriate notifications
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# task_id: The unique identifier of the task
# action_name: The name of the action, e.g. 'build'
# full_name: The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# elapsed_offset (timedelta): (Optional) The time the task started, relative
# to buildstream's start time. Note scheduler tasks
# use this as they don't report relative to wallclock time
# if the Scheduler has been suspended.
#
# Returns:
# The new task
#
def add_task(
self, task_id: str, action_name: str, full_name: str, elapsed_offset: Optional[datetime.timedelta] = None
) -> Task:
assert task_id not in self.tasks, "Trying to add task '{}:{}' with ID '{}' to '{}'".format(
action_name, full_name, task_id, self.tasks
)
if not elapsed_offset:
elapsed_offset = self.elapsed_time()
task = Task(self, task_id, action_name, full_name, elapsed_offset)
self.tasks[task_id] = task
for cb in self._task_added_cbs:
cb(task_id)
return task
# remove_task()
#
# Remove the task and send appropriate notifications
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# task_id: The unique identifier of the task
#
def remove_task(self, task_id: str) -> None:
# Rely on 'del' to raise an error when removing nonexistent tasks
del self.tasks[task_id]
for cb in self._task_removed_cbs:
cb(task_id)
# fail_task()
#
# Notify all registered callbacks that a task has failed.
#
# This is separate from the tasks changed callbacks because a failed task
# requires the frontend to intervene to decide what happens next.
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# task_id: The unique identifier of the task
# element: (optionally) The element unique_id and display keys if an
# element job
#
def fail_task(self, task_id: str, element: Optional[Tuple[int, _DisplayKey]] = None) -> None:
for cb in self._task_failed_cbs:
cb(task_id, element)
# elapsed_time()
#
# Fetches the current session elapsed time
#
# Args:
# start_time: Optional explicit start time, relative to caller.
#
# Returns:
# The amount of time since the start of the session,
# discounting any time spent while jobs were suspended if
# start_time given relative to the Scheduler
#
def elapsed_time(self, start_time: Optional[datetime.datetime] = None) -> datetime.timedelta:
time_now = datetime.datetime.now()
if start_time is None:
start_time = self._session_start or time_now
return time_now - start_time
# offset_start_time()
#
# Update the 'start' time of the application by a given offset
#
# This allows modifying the time spent building when BuildStream
# gets paused then restarted, to give an accurate view of the real
# time spend building.
#
# Args:
# offset: the offset to add to the start time
#
def offset_start_time(self, offset: datetime.timedelta) -> None:
self._session_start += offset