blob: b2f0b705d87d1ac0a8207e52073a1eec556bdf75 [file] [log] [blame]
#
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
from collections import OrderedDict
# TaskGroup
#
# The state data stored for a group of tasks (usually scheduler queues)
#
# Args:
# name (str): The name of the Task Group, e.g. 'build'
# state (State): The state object
#
class TaskGroup():
def __init__(self, name, state):
self.name = name
self.processed_tasks = 0
self.skipped_tasks = 0
# NOTE: failed_tasks is a list of strings instead of an integer count
# because the frontend requires the full list of failed tasks to
# know whether to print failure messages for a given element.
self.failed_tasks = []
self._state = state
self._update_task_group_cbs = []
###########################################
# Core-facing APIs to drive notifications #
###########################################
# add_processed_task()
#
# Update the TaskGroup's count of processed tasks and notify of changes
#
# This is a core-facing API and should not be called from the frontend
#
def add_processed_task(self):
self.processed_tasks += 1
for cb in self._state._task_groups_changed_cbs:
cb()
# add_skipped_task()
#
# Update the TaskGroup's count of skipped tasks and notify of changes
#
# This is a core-facing API and should not be called from the frontend
#
def add_skipped_task(self):
self.skipped_tasks += 1
for cb in self._state._task_groups_changed_cbs:
cb()
# add_failed_task()
#
# Update the TaskGroup's list of failed tasks and notify of changes
#
# Args:
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
#
# This is a core-facing API and should not be called from the frontend
#
def add_failed_task(self, full_name):
self.failed_tasks.append(full_name)
for cb in self._state._task_groups_changed_cbs:
cb()
# State
#
# The state data that is stored for the purpose of sharing with the frontend.
#
# BuildStream's Core is responsible for making changes to this data.
# BuildStream's Frontend may register callbacks with State to be notified
# when parts of State change, and read State to know what has changed.
class State():
def __init__(self):
self.task_groups = OrderedDict() # key is TaskGroup name
# Note: A Task's full_name is technically unique, but only accidentally.
self.tasks = OrderedDict() # key is a tuple of action_name and full_name
self._task_added_cbs = []
self._task_removed_cbs = []
self._task_groups_changed_cbs = []
self._task_failed_cbs = []
#####################################
# Frontend-facing notification APIs #
#####################################
# register_task_added_callback()
#
# Registers a callback to be notified when a task is added
#
# Args:
# callback (function): The callback to be notified
#
# Callback Args:
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
#
def register_task_added_callback(self, callback):
self._task_added_cbs.append(callback)
# unregister_task_added_callback()
#
# Unregisters a callback previously registered by
# register_task_added_callback()
#
# Args:
# callback (function): The callback to be removed
#
def unregister_task_added_callback(self, callback):
self._task_added_cbs.remove(callback)
# register_task_removed_callback()
#
# Registers a callback to be notified when a task is removed
#
# Args:
# callback (function): The callback to be notified
#
# Callback Args:
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
#
def register_task_removed_callback(self, callback):
self._task_removed_cbs.append(callback)
# unregister_task_removed_callback()
#
# Unregisters a callback previously registered by
# register_task_removed_callback()
#
# Args:
# callback (function): The callback to be notified
#
def unregister_task_removed_callback(self, callback):
self._task_removed_cbs.remove(callback)
# register_task_failed_callback()
#
# Registers a callback to be notified when a task has failed
#
# Args:
# callback (function): The callback to be notified
#
# Callback Args:
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# unique_id (int): (optionally) the element's unique ID, if the failure
# came from an element
#
def register_task_failed_callback(self, callback):
self._task_failed_cbs.append(callback)
# unregister_task_failed_callback()
#
# Unregisters a callback previously registered by
# register_task_failed_callback()
#
# Args:
# callback (function): The callback to be removed
#
def unregister_task_failed_callback(self, callback):
self._task_failed_cbs.remove(callback)
##############################################
# Core-facing APIs for driving notifications #
##############################################
# add_task_group()
#
# Notification that a new task group has been added
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# name (str): The name of the task group, e.g. 'build'
#
# Returns:
# TaskGroup: The task group created
#
def add_task_group(self, name):
assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups)
group = TaskGroup(name, self)
self.task_groups[name] = group
return group
# remove_task_group()
#
# Notification that a task group has been removed
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# name (str): The name of the task group, e.g. 'build'
#
def remove_task_group(self, name):
# Rely on 'del' to raise an error when removing nonexistent task groups
del self.task_groups[name]
# add_task()
#
# Add a task and send appropriate notifications
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# start_time (timedelta): The time the task started, relative to
# buildstream's start time.
#
def add_task(self, action_name, full_name, start_time):
task_key = (action_name, full_name)
assert task_key not in self.tasks, \
"Trying to add task '{}:{}' to '{}'".format(action_name, full_name, self.tasks)
task = _Task(action_name, full_name, start_time)
self.tasks[task_key] = task
for cb in self._task_added_cbs:
cb(action_name, full_name)
# remove_task()
#
# Remove the task and send appropriate notifications
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
#
def remove_task(self, action_name, full_name):
# Rely on 'del' to raise an error when removing nonexistent tasks
del self.tasks[(action_name, full_name)]
for cb in self._task_removed_cbs:
cb(action_name, full_name)
# fail_task()
#
# Notify all registered callbacks that a task has failed.
#
# This is separate from the tasks changed callbacks because a failed task
# requires the frontend to intervene to decide what happens next.
#
# This is a core-facing API and should not be called from the frontend
#
# Args:
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# unique_id (int): (optionally) the element's unique ID, if the failure came from an element
#
def fail_task(self, action_name, full_name, unique_id=None):
for cb in self._task_failed_cbs:
cb(action_name, full_name, unique_id)
# _Task
#
# The state data stored for an individual task
#
# Args:
# action_name (str): The name of the action, e.g. 'build'
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# start_time (timedelta): The time the task started, relative to
# buildstream's start time.
class _Task():
def __init__(self, action_name, full_name, start_time):
self.action_name = action_name
self.full_name = full_name
self.start_time = start_time