| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # Authors: |
| # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> |
| # Jürg Billeter <juerg.billeter@codethink.co.uk> |
| |
| # System imports |
| import os |
| from collections import deque |
| import heapq |
| import traceback |
| from typing import TYPE_CHECKING |
| |
| # Local imports |
| from ..jobs import ElementJob, JobStatus |
| from ..resources import ResourceType |
| |
| # BuildStream toplevel imports |
| from ..._exceptions import BstError, ImplError, set_last_task_error |
| from ..._message import Message, MessageType |
| from ...types import FastEnum |
| |
| if TYPE_CHECKING: |
| from typing import List, Optional |
| |
| |
| # Queue status for a given element |
| # |
| # |
| class QueueStatus(FastEnum): |
| # The element is not yet ready to be processed in the queue. |
| PENDING = 1 |
| |
| # The element can skip this queue. |
| SKIP = 2 |
| |
| # The element is ready for processing in this queue. |
| READY = 3 |
| |
| |
| # Queue() |
| # |
| # Args: |
| # scheduler (Scheduler): The Scheduler |
| # |
| class Queue: |
| |
| # These should be overridden on class data of of concrete Queue implementations |
| action_name = None # type: Optional[str] |
| complete_name = None # type: Optional[str] |
| # Resources this queues' jobs want |
| resources = [] # type: List[int] |
| |
| def __init__(self, scheduler, *, imperative=False): |
| |
| # |
| # Private members |
| # |
| self._scheduler = scheduler |
| self._resources = scheduler.resources # Shared resource pool |
| self._ready_queue = [] # Ready elements |
| self._done_queue = deque() # Processed / Skipped elements |
| self._max_retries = 0 |
| self._queued_elements = 0 # Number of elements queued |
| |
| self._required_element_check = False # Whether we should check that elements are required before enqueuing |
| |
| # |
| # Public members |
| # |
| self.imperative = imperative |
| |
| # Assert the subclass has setup class data |
| assert self.action_name is not None |
| assert self.complete_name is not None |
| |
| if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources: |
| self._max_retries = scheduler.context.sched_network_retries |
| |
| self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name) |
| |
| # destroy() |
| # |
| # Explicitly performs all cleanup tasks for this queue |
| # |
| # Note: Doing this explicitly is preferred to a __del__ method because |
| # it is not at the mercy of the garbage collector |
| def destroy(self): |
| self._scheduler._state.remove_task_group(self.action_name) |
| |
| ##################################################### |
| # Abstract Methods for Queue implementations # |
| ##################################################### |
| |
| # get_process_func() |
| # |
| # Abstract method, returns a callable for processing an element. |
| # |
| # The callable should fit the signature `process(element: Element) -> any`. |
| # |
| # Note that the callable may be executed in a child process, so the return |
| # value should be a simple object (must be pickle-able, i.e. strings, |
| # lists, dicts, numbers, but not Element instances). This is sent to back |
| # to the main process. |
| # |
| # This method is the only way for a queue to affect elements, and so is |
| # not optional to implement. |
| # |
| # Returns: |
| # (Callable[[Element], Any]): The callable for processing elements. |
| # |
| def get_process_func(self): |
| raise NotImplementedError() |
| |
| # status() |
| # |
| # Abstract method for reporting the immediate status of an element. The status |
| # determines whether an element can/cannot be pushed into the queue, or even |
| # skip the queue entirely, when called. |
| # |
| # Args: |
| # element (Element): An element to process |
| # |
| # Returns: |
| # (QueueStatus): The element status |
| # |
| def status(self, element): |
| return QueueStatus.READY |
| |
| # done() |
| # |
| # Abstract method for handling a successful job completion. |
| # |
| # Args: |
| # job (Job): The job which completed processing |
| # element (Element): The element which completed processing |
| # result (any): The return value of the process() implementation |
| # status (JobStatus): The return status of the Job |
| # |
| def done(self, job, element, result, status): |
| pass |
| |
| ##################################################### |
| # Virtual Methods for Queue implementations # |
| ##################################################### |
| |
| # register_pending_element() |
| # |
| # Virtual method for registering a queue specific callback |
| # to an Element which is not immediately ready to advance |
| # to the next queue |
| # |
| # Args: |
| # element (Element): The element waiting to be pushed into the queue |
| # |
| def register_pending_element(self, element): |
| raise ImplError("Queue type: {} does not implement register_pending_element()".format(self.action_name)) |
| |
| ##################################################### |
| # Scheduler / Pipeline facing APIs # |
| ##################################################### |
| |
| # enqueue() |
| # |
| # Enqueues some elements |
| # |
| # Args: |
| # elts (list): A list of Elements |
| # |
| def enqueue(self, elts): |
| if not elts: |
| return |
| |
| # Obtain immediate element status |
| for elt in elts: |
| if self._required_element_check and not elt._is_required(): |
| elt._set_required_callback(self._enqueue_element) |
| else: |
| self._enqueue_element(elt) |
| |
| # dequeue() |
| # |
| # A generator which dequeues the elements which |
| # are ready to exit the queue. |
| # |
| # Yields: |
| # (Element): Elements being dequeued |
| # |
| def dequeue(self): |
| while self._done_queue: |
| yield self._done_queue.popleft() |
| |
| # dequeue_ready() |
| # |
| # Reports whether any elements can be promoted to other queues |
| # |
| # Returns: |
| # (bool): Whether there are elements ready |
| # |
| def dequeue_ready(self): |
| return any(self._done_queue) |
| |
| # harvest_jobs() |
| # |
| # Spawn as many jobs from the ready queue for which resources |
| # can be reserved. |
| # |
| # Priority is first given to elements which have been assigned a lower |
| # depth (see Element._set_depth()), and then to elements which have |
| # been enqueued earlier. |
| # |
| # Returns: |
| # ([Job]): A list of jobs which can be run now |
| # |
| def harvest_jobs(self): |
| ready = [] |
| while self._ready_queue: |
| # Now reserve them |
| reserved = self._resources.reserve(self.resources) |
| if not reserved: |
| break |
| |
| _, _, element = heapq.heappop(self._ready_queue) |
| ready.append(element) |
| |
| return [ |
| ElementJob( |
| self._scheduler, |
| self.action_name, |
| self._element_log_path(element), |
| element=element, |
| queue=self, |
| action_cb=self.get_process_func(), |
| complete_cb=self._job_done, |
| max_retries=self._max_retries, |
| ) |
| for element in ready |
| ] |
| |
| # set_required_element_check() |
| # |
| # This ensures that, for the first non-track queue, we must check |
| # whether elements are required before enqueuing them |
| def set_required_element_check(self): |
| self._required_element_check = True |
| |
| # any_failed_elements() |
| # |
| # Returns whether any elements in this queue have failed their jobs |
| def any_failed_elements(self): |
| return any(self._task_group.failed_tasks) |
| |
| ##################################################### |
| # Private Methods # |
| ##################################################### |
| |
| # _update_workspaces() |
| # |
| # Updates and possibly saves the workspaces in the |
| # main data model in the main process after a job completes. |
| # |
| # Args: |
| # element (Element): The element which completed |
| # |
| def _update_workspaces(self, element): |
| # FIXME: Does this really needs to be done for every job or only some? |
| # If some, we should only run it for those. |
| workspace = element._get_workspace() |
| |
| # Handle any workspace modifications now |
| # |
| if workspace: |
| context = element._get_context() |
| workspaces = context.get_workspaces() |
| if workspaces.update_workspace(element._get_full_name(), workspace.to_dict()): |
| try: |
| workspaces.save_config() |
| except BstError as e: |
| self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e)) |
| except Exception: # pylint: disable=broad-except |
| self._message( |
| element, |
| MessageType.BUG, |
| "Unhandled exception while saving workspaces", |
| detail=traceback.format_exc(), |
| ) |
| |
| # _job_done() |
| # |
| # A callback reported by the Job() when a job completes |
| # |
| # This will call the Queue implementation specific Queue.done() |
| # implementation and trigger the scheduler to reschedule. |
| # |
| # See the Job object for an explanation of the call signature |
| # |
| def _job_done(self, job, element, status, result): |
| |
| # Now release the resources we reserved |
| # |
| self._resources.release(self.resources) |
| |
| # Update values that need to be synchronized in the main task |
| # before calling any queue implementation |
| self._update_workspaces(element) |
| |
| # Give the result of the job to the Queue implementor, |
| # and determine if it should be considered as processed |
| # or skipped. |
| try: |
| self.done(job, element, result, status) |
| except BstError as e: |
| |
| # Report error and mark as failed |
| # |
| self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) |
| self._task_group.add_failed_task(element._get_full_name()) |
| |
| # Treat this as a task error as it's related to a task |
| # even though it did not occur in the task context |
| # |
| # This just allows us stronger testing capability |
| # |
| set_last_task_error(e.domain, e.reason) |
| |
| except Exception: # pylint: disable=broad-except |
| |
| # Report unhandled exceptions and mark as failed |
| # |
| self._message( |
| element, MessageType.BUG, "Unhandled exception in post processing", detail=traceback.format_exc() |
| ) |
| self._task_group.add_failed_task(element._get_full_name()) |
| else: |
| # All elements get placed on the done queue for later processing. |
| self._done_queue.append(element) |
| |
| # These lists are for bookkeeping purposes for the UI and logging. |
| if status == JobStatus.SKIPPED or job.get_terminated(): |
| self._task_group.add_skipped_task() |
| elif status == JobStatus.OK: |
| self._task_group.add_processed_task() |
| else: |
| self._task_group.add_failed_task(element._get_full_name()) |
| |
| # Convenience wrapper for Queue implementations to send |
| # a message for the element they are processing |
| def _message(self, element, message_type, brief, **kwargs): |
| message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs) |
| self._scheduler.context.messenger.message(message) |
| |
| def _element_log_path(self, element): |
| project = element._get_project() |
| key = element._get_display_key() |
| action = self.action_name.lower() |
| logfile = "{key}-{action}".format(key=key.brief, action=action) |
| |
| return os.path.join(project.name, element.normal_name, logfile) |
| |
| # _enqueue_element() |
| # |
| # Enqueue an Element upon a callback to a specific queue |
| # Here we check whether an element is either immediately ready to be processed |
| # in the current queue or whether it can skip the queue. Element's which are |
| # not yet ready to be processed or cannot skip will have the appropriate |
| # callback registered |
| # |
| # Args: |
| # element (Element): The Element to enqueue |
| # |
| def _enqueue_element(self, element): |
| status = self.status(element) |
| |
| if status == QueueStatus.SKIP: |
| # Place skipped elements into the done queue immediately |
| self._task_group.add_skipped_task() |
| self._done_queue.append(element) # Elements to proceed to the next queue |
| elif status == QueueStatus.READY: |
| # Push elements which are ready to be processed immediately into the queue |
| heapq.heappush(self._ready_queue, (element._depth, self._queued_elements, element)) |
| self._queued_elements += 1 |
| else: |
| # Register a queue specific callback for pending elements |
| self.register_pending_element(element) |