| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2016 Codethink Limited |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this library. If not, see <http://www.gnu.org/licenses/>. |
| # |
| # Authors: |
| # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> |
| # Jürg Billeter <juerg.billeter@codethink.co.uk> |
| |
| # System imports |
| from collections import deque |
| |
| # Local imports |
| from .job import Job |
| |
| |
| # Indicates the kind of activity |
| # |
| # |
| class QueueType(): |
| # Tasks which download stuff from the internet |
| FETCH = 1 |
| |
| # CPU/Disk intensive tasks |
| BUILD = 2 |
| |
| # Tasks which upload stuff to the internet |
| PUSH = 3 |
| |
| |
| # Queue() |
| # |
| # |
| class Queue(): |
| |
| # These should be overridden on class data of of concrete Queue implementations |
| action_name = None |
| complete_name = None |
| queue_type = None |
| |
| def __init__(self): |
| self.scheduler = None |
| self.wait_queue = deque() |
| self.done_queue = deque() |
| self.active_jobs = [] |
| self.max_retries = 0 |
| |
| # For the frontend to know how many elements |
| # were successfully processed, failed, or skipped |
| # as they did not require processing. |
| # |
| self.failed_elements = [] |
| self.processed_elements = [] |
| self.skipped_elements = [] |
| |
| # Assert the subclass has setup class data |
| assert(self.action_name is not None) |
| assert(self.complete_name is not None) |
| assert(self.queue_type is not None) |
| |
| ##################################################### |
| # Abstract Methods for Queue implementations # |
| ##################################################### |
| |
| # process() |
| # |
| # Abstract method for processing an element |
| # |
| # Args: |
| # element (Element): An element to process |
| # |
| # Returns: |
| # (any): An optional something to be returned |
| # for every element successfully processed |
| # |
| # |
| def process(self, element): |
| pass |
| |
| # ready() |
| # |
| # Abstract method for reporting whether an element |
| # is ready for processing in this queue or not. |
| # |
| # Args: |
| # element (Element): An element to process |
| # |
| # Returns: |
| # (bool): Whether the element is ready for processing |
| # |
| def ready(self, element): |
| return True |
| |
| # skip() |
| # |
| # Abstract method for reporting whether an element |
| # can be skipped for this phase. |
| # |
| # Args: |
| # element (Element): An element to process |
| # |
| # Returns: |
| # (bool): Whether the element can be skipped |
| # |
| def skip(self, element): |
| return False |
| |
| # done() |
| # |
| # Abstract method for handling a successful job completion. |
| # |
| # Args: |
| # element (Element): The element which completed processing |
| # result (any): The return value of the process() implementation |
| # returncode (int): The process return code, 0 = success |
| # |
| # Returns: |
| # (bool): True if the element should appear to be processsed, |
| # Otherwise False will count the element as "skipped" |
| # |
| def done(self, element, result, returncode): |
| pass |
| |
| ##################################################### |
| # Queue internals and Scheduler facing APIs # |
| ##################################################### |
| |
| # Attach to the scheduler |
| def attach(self, scheduler): |
| self.scheduler = scheduler |
| if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH: |
| self.max_retries = scheduler.context.sched_network_retries |
| |
| def enqueue(self, elts): |
| if not elts: |
| return |
| |
| # Place skipped elements directly on the done queue |
| elts = list(elts) |
| skip = [elt for elt in elts if self.skip(elt)] |
| wait = [elt for elt in elts if elt not in skip] |
| |
| self.wait_queue.extend(wait) |
| self.done_queue.extend(skip) |
| self.skipped_elements.extend(skip) |
| |
| def dequeue(self): |
| while len(self.done_queue) > 0: |
| yield self.done_queue.popleft() |
| |
| def process_ready(self): |
| scheduler = self.scheduler |
| unready = [] |
| |
| while len(self.wait_queue) > 0 and scheduler.get_job_token(self.queue_type): |
| element = self.wait_queue.popleft() |
| |
| if not self.ready(element): |
| scheduler.put_job_token(self.queue_type) |
| unready.append(element) |
| continue |
| elif self.skip(element): |
| scheduler.put_job_token(self.queue_type) |
| self.done_queue.append(element) |
| self.skipped_elements.append(element) |
| continue |
| |
| job = Job(scheduler, element, self.action_name) |
| scheduler.job_starting(job) |
| |
| job.spawn(self.process, self.job_done, self.max_retries) |
| self.active_jobs.append(job) |
| |
| # These were not ready but were in the beginning, give em |
| # first priority again next time around |
| self.wait_queue.extendleft(unready) |
| |
| def job_done(self, job, returncode, element): |
| |
| # Shutdown the job |
| job.shutdown() |
| self.active_jobs.remove(job) |
| |
| # Give the result of the job to the Queue implementor, |
| # and determine if it should be considered as processed |
| # or skipped. |
| if self.done(element, job.result, returncode): |
| skip = False |
| else: |
| skip = True |
| |
| if returncode == 0: |
| self.done_queue.append(element) |
| if skip: |
| self.skipped_elements.append(element) |
| else: |
| self.processed_elements.append(element) |
| else: |
| self.failed_elements.append(element) |
| |
| # Give the token for this job back to the scheduler |
| # immediately before invoking another round of scheduling |
| self.scheduler.put_job_token(self.queue_type) |
| |
| # Notify frontend |
| self.scheduler.job_completed(self, job, returncode == 0) |
| |
| self.scheduler.sched() |