blob: 4a552b0b9775433200f53703c5c463767b7ef3b1 [file] [log] [blame]
# Copyright (C) 2019 Bloomberg Finance LP
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <>.
import datetime
from collections import OrderedDict
# TaskGroup
# The state data stored for a group of tasks (usually scheduler queues)
# Args:
# name (str): The name of the Task Group, e.g. 'build'
# state (State): The state object
# complete_name (str): Optional name for frontend status rendering, e.g. 'built'
class TaskGroup:
def __init__(self, name, state, complete_name=None): = name
self.complete_name = complete_name
self.processed_tasks = 0
self.skipped_tasks = 0
# NOTE: failed_tasks is a list of strings instead of an integer count
# because the frontend requires the full list of failed tasks to
# know whether to print failure messages for a given element.
self.failed_tasks = []
self._state = state
self._update_task_group_cbs = []
# Core-facing APIs to drive notifications #
# add_processed_task()
# Update the TaskGroup's count of processed tasks and notify of changes
# This is a core-facing API and should not be called from the frontend
def add_processed_task(self):
self.processed_tasks += 1
for cb in self._state._task_groups_changed_cbs:
# add_skipped_task()
# Update the TaskGroup's count of skipped tasks and notify of changes
# This is a core-facing API and should not be called from the frontend
def add_skipped_task(self):
self.skipped_tasks += 1
for cb in self._state._task_groups_changed_cbs:
# add_failed_task()
# Update the TaskGroup's list of failed tasks and notify of changes
# Args:
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# This is a core-facing API and should not be called from the frontend
def add_failed_task(self, full_name):
for cb in self._state._task_groups_changed_cbs:
# State
# The state data that is stored for the purpose of sharing with the frontend.
# BuildStream's Core is responsible for making changes to this data.
# BuildStream's Frontend may register callbacks with State to be notified
# when parts of State change, and read State to know what has changed.
# Args:
# session_start (datetime): The time the session started
class State:
def __init__(self, session_start):
self._session_start = session_start
self.task_groups = OrderedDict() # key is TaskGroup name
# Note: A Task's full_name is technically unique, but only accidentally.
self.tasks = OrderedDict() # key is a tuple of action_name and full_name
self._task_added_cbs = []
self._task_removed_cbs = []
self._task_changed_cbs = []
self._task_groups_changed_cbs = []
self._task_failed_cbs = []
self._task_retry_cbs = []
# Frontend-facing notification APIs #
# register_task_added_callback()
# Registers a callback to be notified when a task is added
# Args:
# callback (function): The callback to be notified
# Callback Args:
# task_id (str): The unique identifier of the task
def register_task_added_callback(self, callback):
# unregister_task_added_callback()
# Unregisters a callback previously registered by
# register_task_added_callback()
# Args:
# callback (function): The callback to be removed
def unregister_task_added_callback(self, callback):
# register_task_removed_callback()
# Registers a callback to be notified when a task is removed
# Args:
# callback (function): The callback to be notified
# Callback Args:
# task_id (str): The unique identifier of the task
def register_task_removed_callback(self, callback):
# unregister_task_removed_callback()
# Unregisters a callback previously registered by
# register_task_removed_callback()
# Args:
# callback (function): The callback to be notified
def unregister_task_removed_callback(self, callback):
# register_task_changed_callback()
# Register a callback to be notified when a task has changed
# Args:
# callback (function): The callback to be notified
# Callback Args:
# task_id (str): The unique identifier of the task
def register_task_changed_callback(self, callback):
# unregister_task_changed_callback()
# Unregisters a callback previously registered by
# register_task_changed_callback()
# Args:
# callback (function): The callback to be notified
def unregister_task_changed_callback(self, callback):
# register_task_failed_callback()
# Registers a callback to be notified when a task has failed
# Args:
# callback (function): The callback to be notified
# Callback Args:
# task_id (str): The unique identifier of the task
# element (tuple): (optionally) The element unique_id and display keys if an
# element job
def register_task_failed_callback(self, callback):
# unregister_task_failed_callback()
# Unregisters a callback previously registered by
# register_task_failed_callback()
# Args:
# callback (function): The callback to be removed
def unregister_task_failed_callback(self, callback):
# register_task_retry_callback()
# Registers a callback to be notified when a task is to be retried
# Args:
# callback (function): The callback to be notified
# Callback Args:
# task_id (str): The unique identifier of the task
# unique_id: The unique id of the plugin instance to look up
def register_task_retry_callback(self, callback):
# Core-facing APIs for driving notifications #
# add_task_group()
# Notification that a new task group has been added
# This is a core-facing API and should not be called from the frontend
# Args:
# name (str): The name of the task group, e.g. 'build'
# complete_name (str): Optional name to be used for frontend status rendering, e.g. 'built'
# Returns:
# TaskGroup: The task group created
def add_task_group(self, name, complete_name=None):
assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups)
group = TaskGroup(name, self, complete_name)
self.task_groups[name] = group
return group
# remove_task_group()
# Notification that a task group has been removed
# This is a core-facing API and should not be called from the frontend
# Args:
# name (str): The name of the task group, e.g. 'build'
def remove_task_group(self, name):
# Rely on 'del' to raise an error when removing nonexistent task groups
del self.task_groups[name]
# add_task()
# Add a task and send appropriate notifications
# This is a core-facing API and should not be called from the frontend
# Args:
# task_id (str): The unique identifier of the task
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# elapsed_offset (timedelta): (Optional) The time the task started, relative
# to buildstream's start time. Note scheduler tasks
# use this as they don't report relative to wallclock time
# if the Scheduler has been suspended.
def add_task(self, task_id, action_name, full_name, elapsed_offset=None):
assert task_id not in self.tasks, "Trying to add task '{}:{}' with ID '{}' to '{}'".format(
action_name, full_name, task_id, self.tasks
if not elapsed_offset:
elapsed_offset = self.elapsed_time()
task = _Task(self, task_id, action_name, full_name, elapsed_offset)
self.tasks[task_id] = task
for cb in self._task_added_cbs:
return task
# remove_task()
# Remove the task and send appropriate notifications
# This is a core-facing API and should not be called from the frontend
# Args:
# task_id (str): The unique identifier of the task
def remove_task(self, task_id):
# Rely on 'del' to raise an error when removing nonexistent tasks
del self.tasks[task_id]
for cb in self._task_removed_cbs:
# fail_task()
# Notify all registered callbacks that a task has failed.
# This is separate from the tasks changed callbacks because a failed task
# requires the frontend to intervene to decide what happens next.
# This is a core-facing API and should not be called from the frontend
# Args:
# task_id (str): The unique identifier of the task
# element (tuple): (optionally) The element unique_id and display keys if an
# element job
def fail_task(self, task_id, element=None):
for cb in self._task_failed_cbs:
cb(task_id, element)
# retry_task()
# Notify all registered callbacks that a task is to be retried.
# This is a core-facing API and should not be called from the frontend
# Args:
# task_id (str): The unique identifier of the task
# unique_id: The unique id of the plugin instance to look up
def retry_task(self, task_id: str, unique_id: str) -> None:
for cb in self._task_retry_cbs:
cb(task_id, unique_id)
# elapsed_time()
# Fetches the current session elapsed time
# Args:
# start_time(time): Optional explicit start time, relative to caller.
# Returns:
# (timedelta): The amount of time since the start of the session,
# discounting any time spent while jobs were suspended if
# start_time given relative to the Scheduler
def elapsed_time(self, start_time=None):
time_now =
if start_time is None:
start_time = self._session_start or time_now
return time_now - start_time
# offset_start_time()
# Update the 'start' time of the application by a given offset
# This allows modifying the time spent building when BuildStream
# gets paused then restarted, to give an accurate view of the real
# time spend building.
# Args:
# offset: the offset to add to the start time
def offset_start_time(self, offset: datetime.timedelta) -> None:
self._session_start += offset
# _Task
# The state data stored for an individual task
# Args:
# state (State): The State object
# task_id (str): The unique identifier of the task
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# elapsed_offset (timedelta): The time the task started, relative to
# buildstream's start time.
class _Task:
def __init__(self, state, task_id, action_name, full_name, elapsed_offset):
self._state = state = task_id
self.action_name = action_name
self.full_name = full_name
self.elapsed_offset = elapsed_offset
self.current_progress = None
self.maximum_progress = None
self._render_cb = None # Callback to call when something could be rendered
# set_render_cb()
# Sets the callback to be called when the Task has changed and should be rendered
# NOTE: This should probably be removed once the frontend is running
# separately from the scheduler, since renders could be triggered
# by the scheduler.
def set_render_cb(self, callback):
self._render_cb = callback
def set_current_progress(self, progress):
self.current_progress = progress
for cb in self._state._task_changed_cbs:
if self._render_cb:
def set_maximum_progress(self, progress):
self.maximum_progress = progress
for cb in self._state._task_changed_cbs:
if self._render_cb:
def add_current_progress(self):
if self.current_progress is None:
new_progress = 1
new_progress = self.current_progress + 1