Make Jobs abstract and element-independent
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 4675b0e..de910af 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -492,30 +492,37 @@
     def _tick(self, elapsed):
         self._maybe_render_status()
 
-    def _job_started(self, element, action_name):
-        self._status.add_job(element, action_name)
+    def _job_started(self, job):
+        self._status.add_job(job)
         self._maybe_render_status()
 
-    def _job_completed(self, element, queue, action_name, success):
-        self._status.remove_job(element, action_name)
+    def _job_completed(self, job, success):
+        self._status.remove_job(job)
         self._maybe_render_status()
 
         # Dont attempt to handle a failure if the user has already opted to
         # terminate
         if not success and not self.stream.terminated:
 
-            # Get the last failure message for additional context
-            failure = self._fail_messages.get(element._get_unique_id())
+            if hasattr(job, 'element'):
+                element = job.element
+                queue = job.queue
 
-            # XXX This is dangerous, sometimes we get the job completed *before*
-            # the failure message reaches us ??
-            if not failure:
-                self._status.clear()
-                click.echo("\n\n\nBUG: Message handling out of sync, " +
-                           "unable to retrieve failure message for element {}\n\n\n\n\n"
-                           .format(element), err=True)
+                # Get the last failure message for additional context
+                failure = self._fail_messages.get(element._get_unique_id())
+
+                # XXX This is dangerous, sometimes we get the job completed *before*
+                # the failure message reaches us ??
+                if not failure:
+                    self._status.clear()
+                    click.echo("\n\n\nBUG: Message handling out of sync, " +
+                               "unable to retrieve failure message for element {}\n\n\n\n\n"
+                               .format(element), err=True)
+                else:
+                    self._handle_failure(element, queue, failure)
             else:
-                self._handle_failure(element, queue, failure)
+                click.echo("\nTerminating all jobs\n", err=True)
+                self.stream.terminate()
 
     def _handle_failure(self, element, queue, failure):
 
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 3f66e00..7a2e719 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -77,9 +77,9 @@
     #    element (Element): The element of the job to track
     #    action_name (str): The action name for this job
     #
-    def add_job(self, element, action_name):
+    def add_job(self, job):
         elapsed = self._stream.elapsed_time
-        job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed)
+        job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed)
         self._jobs.append(job)
         self._need_alloc = True
 
@@ -91,7 +91,13 @@
     #    element (Element): The element of the job to track
     #    action_name (str): The action name for this job
     #
-    def remove_job(self, element, action_name):
+    def remove_job(self, job):
+        action_name = job.action_name
+        if not hasattr(job, 'element'):
+            element = None
+        else:
+            element = job.element
+
         self._jobs = [
             job for job in self._jobs
             if not (job.element is element and
@@ -358,15 +364,19 @@
 #
 # Args:
 #    context (Context): The Context
-#    element (Element): The element being processed
-#    action_name (str): The name of the action
+#    job (Job): The job being processed
 #    content_profile (Profile): Formatting profile for content text
 #    format_profile (Profile): Formatting profile for formatting text
 #    elapsed (datetime): The offset into the session when this job is created
 #
 class _StatusJob():
 
-    def __init__(self, context, element, action_name, content_profile, format_profile, elapsed):
+    def __init__(self, context, job, content_profile, format_profile, elapsed):
+        action_name = job.action_name
+        if not hasattr(job, 'element'):
+            element = None
+        else:
+            element = job.element
 
         #
         # Public members
@@ -374,6 +384,7 @@
         self.element = element            # The Element
         self.action_name = action_name    # The action name
         self.size = None                  # The number of characters required to render
+        self.full_name = element._get_full_name() if element else action_name
 
         #
         # Private members
@@ -386,7 +397,7 @@
         # Calculate the size needed to display
         self.size = 10  # Size of time code with brackets
         self.size += len(action_name)
-        self.size += len(element._get_full_name())
+        self.size += len(self.full_name)
         self.size += 3  # '[' + ':' + ']'
 
     # render()
@@ -403,7 +414,7 @@
             self._format_profile.fmt(']')
 
         # Add padding after the display name, before terminating ']'
-        name = self.element._get_full_name() + (' ' * padding)
+        name = self.full_name + (' ' * padding)
         text += self._format_profile.fmt('[') + \
             self._content_profile.fmt(self.action_name) + \
             self._format_profile.fmt(':') + \
diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py
index a53a133..8e1140f 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -17,7 +17,7 @@
 #  Authors:
 #        Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
 
-from .queues import Queue, QueueStatus, QueueType
+from .queues import Queue, QueueStatus
 
 from .queues.fetchqueue import FetchQueue
 from .queues.trackqueue import TrackQueue
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
new file mode 100644
index 0000000..0030f5c
--- /dev/null
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -0,0 +1 @@
+from .elementjob import ElementJob
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
new file mode 100644
index 0000000..68f4e04
--- /dev/null
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -0,0 +1,217 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniël Maat <tristan.maat@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from ruamel import yaml
+
+from ..._message import Message, MessageType
+from ...plugin import _plugin_lookup
+from ... import _signals
+
+from .job import Job
+
+
+# ElementJob()
+#
+# A job to run an element's commands. When this job is spawned
+# `action_cb` will be called, and when it completes `complete_cb` will
+# be called.
+#
+# Args:
+#    scheduler (Scheduler): The scheduler
+#    action_name (str): The queue action name
+#    max_retries (int): The maximum number of retries
+#    action_cb (callable): The function to execute on the child
+#    complete_cb (callable): The function to execute when the job completes
+#    element (Element): The element to work on
+#    kwargs: Remaining Job() constructor arguments
+#
+# Here is the calling signature of the action_cb:
+#
+#     action_cb():
+#
+#     This function will be called in the child task
+#
+#     Args:
+#        element (Element): The element passed to the Job() constructor
+#
+#     Returns:
+#        (object): Any abstract simple python object, including a string, int,
+#                  bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+#     complete_cb():
+#
+#     This function will be called when the child task completes
+#
+#     Args:
+#        job (Job): The job object which completed
+#        element (Element): The element passed to the Job() constructor
+#        success (bool): True if the action_cb did not raise an exception
+#        result (object): The deserialized object returned by the `action_cb`, or None
+#                         if `success` is False
+#
+class ElementJob(Job):
+    def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.queue = queue
+        self._element = element
+        self._action_cb = action_cb            # The action callable function
+        self._complete_cb = complete_cb        # The complete callable function
+
+    @property
+    def element(self):
+        return self._element
+
+    # _child_process()
+    #
+    # This will be executed after fork(), and is intended to perform
+    # the job's task.
+    #
+    # Returns:
+    #    (any): A (simple!) object to be returned to the main thread
+    #           as the result.
+    #
+    def _child_process(self):
+        return self._action_cb(self._element)
+
+    def _parent_complete(self, success, result):
+        self._complete_cb(self, self._element, success, self._result)
+
+    # _child_logging_enabled()
+    #
+    # Start the log for this job. This function will be given a
+    # template string for the path to a log file - this will contain
+    # "{pid}", which should be replaced with the current process'
+    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+    #
+    # Args:
+    #    logfile (str): A template string that points to the logfile
+    #                   that should be used - replace {pid} first.
+    #
+    # Yields:
+    #    (str) The path to the logfile with {pid} replaced.
+    #
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+
+        with open(self._logfile, 'a') as log:
+            # Write one last line to the log and flush it to disk
+            def flush_log():
+
+                # If the process currently had something happening in the I/O stack
+                # then trying to reenter the I/O stack will fire a runtime error.
+                #
+                # So just try to flush as well as we can at SIGTERM time
+                try:
+                    # FIXME: Better logging
+
+                    log.write('\n\nAction {} for element {} forcefully terminated\n'
+                              .format(self.action_name, self._element.name))
+                    log.flush()
+                except RuntimeError:
+                    os.fsync(log.fileno())
+
+            self._element._set_log_handle(log)
+            with _signals.terminator(flush_log):
+                self._print_start_message(self._element, self._logfile)
+                yield self._logfile
+            self._element._set_log_handle(None)
+            self._logfile = None
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(
+            Message(self._element._get_unique_id(),
+                    message_type,
+                    message,
+                    **args))
+
+    def _print_start_message(self, element, logfile):
+        self._message(MessageType.START, self.action_name, logfile=logfile)
+
+        # Print the element's environment at the beginning of any element's log file.
+        #
+        # This should probably be omitted for non-build tasks but it's harmless here
+        elt_env = element.get_environment()
+        env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
+        self._message(MessageType.LOG,
+                      "Build environment for element {}".format(element.name),
+                      detail=env_dump, logfile=logfile)
+
+    # _child_log()
+    #
+    # Log a message returned by the frontend's main message handler
+    # and return it to the main process.
+    #
+    # Arguments:
+    #     message (str): The message to log
+    #
+    # Returns:
+    #     message (Message): A message object
+    #
+    def _child_log(self, message):
+        # Tag them on the way out the door...
+        message.action_name = self.action_name
+        message.task_id = self._element._get_unique_id()
+
+        # Use the plugin for the task for the output, not a plugin
+        # which might be acting on behalf of the task
+        plugin = _plugin_lookup(message.task_id)
+
+        with plugin._output_file() as output:
+            message_text = self._format_frontend_message(message, '[{}]'.format(plugin.name))
+            output.write('{}\n'.format(message_text))
+            output.flush()
+
+        return message
+
+    # _child_process_data()
+    #
+    # Abstract method to retrieve additional data that should be
+    # returned to the parent process. Note that the job result is
+    # retrieved independently.
+    #
+    # Values can later be retrieved in Job.child_data.
+    #
+    # Returns:
+    #    (dict) A dict containing values later to be read by _process_sync_data
+    #
+    def _child_process_data(self):
+        data = {}
+
+        workspace = self._element._get_workspace()
+
+        if workspace is not None:
+            data['workspace'] = workspace.to_dict()
+
+        return data
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/jobs/job.py
similarity index 65%
rename from buildstream/_scheduler/job.py
rename to buildstream/_scheduler/jobs/job.py
index 8b9af93..580f9ff 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -26,20 +26,21 @@
 import traceback
 import asyncio
 import multiprocessing
-from ruamel import yaml
+from contextlib import contextmanager
+
+import psutil
 
 # BuildStream toplevel imports
-from .._exceptions import BstError, set_last_task_error
-from .._message import Message, MessageType, unconditional_messages
-from ..plugin import _plugin_lookup
-from .. import _signals, utils
+from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._message import MessageType, unconditional_messages
+from ... import _signals, utils
 
 
 # Used to distinguish between status messages and return values
 class Envelope():
     def __init__(self, message_type, message):
-        self.message_type = message_type
-        self.message = message
+        self._message_type = message_type
+        self._message = message
 
 
 # Process class that doesn't call waitpid on its own.
@@ -54,54 +55,50 @@
 # Job()
 #
 # The Job object represents a parallel task, when calling Job.spawn(),
-# the given `action_cb` will be called in parallel to the calling process,
-# and `complete_cb` will be called with the action result in the calling
-# process when the job completes.
+# the given `Job._child_process` will be called in parallel to the
+# calling process, and `Job._parent_complete` will be called with the
+# action result in the calling process when the job completes.
 #
 # Args:
 #    scheduler (Scheduler): The scheduler
-#    element (Element): The element to operate on
 #    action_name (str): The queue action name
-#    action_cb (callable): The action function
-#    complete_cb (callable): The function to call when complete
+#    logfile (str): A template string that points to the logfile
+#                   that should be used - should contain {pid}.
+#    resources (iter(ResourceType)) - A set of resources this job
+#                                     wants to use.
+#    exclusive_resources (iter(ResourceType)) - A set of resources
+#                                               this job wants to use
+#                                               exclusively.
 #    max_retries (int): The maximum number of retries
 #
-# Here is the calling signature of the action_cb:
-#
-#     action_cb():
-#
-#     This function will be called in the child task
-#
-#     Args:
-#        element (Element): The element passed to the Job() constructor
-#
-#     Returns:
-#        (object): Any abstract simple python object, including a string, int,
-#                  bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-#     complete_cb():
-#
-#     This function will be called when the child task completes
-#
-#     Args:
-#        job (Job): The job object which completed
-#        element (Element): The element passed to the Job() constructor
-#        success (bool): True if the action_cb did not raise an exception
-#        result (object): The deserialized object returned by the `action_cb`, or None
-#                         if `success` is False
-#
 class Job():
 
-    def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0):
+    def __init__(self, scheduler, action_name, logfile, *,
+                 resources=None, exclusive_resources=None, max_retries=0):
+
+        if resources is None:
+            resources = set()
+        else:
+            resources = set(resources)
+        if exclusive_resources is None:
+            exclusive_resources = set()
+        else:
+            exclusive_resources = set(resources)
+
+        # Ensure nobody tries not use an exclusive resource.
+        assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
 
         #
         # Public members
         #
-        self.element = element           # The element we're processing
         self.action_name = action_name   # The action name for the Queue
-        self.workspace_dict = None       # A serialized Workspace object, after any modifications
+        self.child_data = None           # Data to be sent to the main process
+
+        # The resources this job wants to access
+        self.resources = resources
+        # Resources this job needs to access exclusively, i.e., no
+        # other job should be allowed to access them
+        self.exclusive_resources = exclusive_resources
 
         #
         # Private members
@@ -110,13 +107,12 @@
         self._queue = multiprocessing.Queue()  # A message passing queue
         self._process = None                   # The Process object
         self._watcher = None                   # Child process watcher
-        self._action_cb = action_cb            # The action callable function
-        self._complete_cb = complete_cb        # The complete callable function
         self._listening = False                # Whether the parent is currently listening
         self._suspended = False                # Whether this job is currently suspended
         self._max_retries = max_retries        # Maximum number of automatic retries
         self._result = None                    # Return value of child action in the parent
         self._tries = 0                        # Try count, for retryable jobs
+        self._logfile = logfile
 
     # spawn()
     #
@@ -152,8 +148,7 @@
         # First resume the job if it's suspended
         self.resume(silent=True)
 
-        self._message(self.element, MessageType.STATUS,
-                      "{} terminating".format(self.action_name))
+        self._message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
         # Make sure there is no garbage on the queue
         self._parent_stop_listening()
@@ -184,9 +179,15 @@
     def kill(self):
 
         # Force kill
-        self._message(self.element, MessageType.WARN,
+        self._message(MessageType.WARN,
                       "{} did not terminate gracefully, killing".format(self.action_name))
-        utils._kill_process_tree(self._process.pid)
+
+        try:
+            utils._kill_process_tree(self._process.pid)
+        # This can happen if the process died of its own accord before
+        # we try to kill it
+        except psutil.NoSuchProcess:
+            return
 
     # suspend()
     #
@@ -194,7 +195,7 @@
     #
     def suspend(self):
         if not self._suspended:
-            self._message(self.element, MessageType.STATUS,
+            self._message(MessageType.STATUS,
                           "{} suspending".format(self.action_name))
 
             try:
@@ -219,13 +220,106 @@
     def resume(self, silent=False):
         if self._suspended:
             if not silent:
-                self._message(self.element, MessageType.STATUS,
+                self._message(MessageType.STATUS,
                               "{} resuming".format(self.action_name))
 
             os.kill(self._process.pid, signal.SIGCONT)
             self._suspended = False
 
     #######################################################
+    #                  Abstract Methods                   #
+    #######################################################
+    # _parent_complete()
+    #
+    # This will be executed after the job finishes, and is expected to
+    # pass the result to the main thread.
+    #
+    # Args:
+    #    success (bool): Whether the job was successful.
+    #    result (any): The result returned by _child_process().
+    #
+    def _parent_complete(self, success, result):
+        raise ImplError("Job '{kind}' does not implement _parent_complete()"
+                        .format(kind=type(self).__name__))
+
+    # _child_process()
+    #
+    # This will be executed after fork(), and is intended to perform
+    # the job's task.
+    #
+    # Returns:
+    #    (any): A (simple!) object to be returned to the main thread
+    #           as the result.
+    #
+    def _child_process(self):
+        raise ImplError("Job '{kind}' does not implement _child_process()"
+                        .format(kind=type(self).__name__))
+
+    # _child_logging_enabled()
+    #
+    # Start the log for this job. This function will be given a
+    # template string for the path to a log file - this will contain
+    # "{pid}", which should be replaced with the current process'
+    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+    #
+    # Args:
+    #    logfile (str): A template string that points to the logfile
+    #                   that should be used - replace {pid} first.
+    #
+    # Yields:
+    #    (str) The path to the logfile with {pid} replaced.
+    #
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        raise ImplError("Job '{kind}' does not implement _child_logging_enabled()"
+                        .format(kind=type(self).__name__))
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        raise ImplError("Job '{kind}' does not implement _message()"
+                        .format(kind=type(self).__name__))
+
+    # _child_process_data()
+    #
+    # Abstract method to retrieve additional data that should be
+    # returned to the parent process. Note that the job result is
+    # retrieved independently.
+    #
+    # Values can later be retrieved in Job.child_data.
+    #
+    # Returns:
+    #    (dict) A dict containing values later to be read by _process_sync_data
+    #
+    def _child_process_data(self):
+        return {}
+
+    # _child_log()
+    #
+    # Log a message returned by the frontend's main message handler
+    # and return it to the main process.
+    #
+    # This method is also expected to add process-specific information
+    # to the message (notably, action_name and task_id).
+    #
+    # Arguments:
+    #     message (str): The message to log
+    #
+    # Returns:
+    #     message (Message): A message object
+    #
+    def _child_log(self, message):
+        raise ImplError("Job '{kind}' does not implement _child_log()"
+                        .format(kind=type(self).__name__))
+
+    #######################################################
     #                  Local Private Methods              #
     #######################################################
     #
@@ -237,24 +331,41 @@
     #
     #######################################################
 
-    # _message():
+    # _format_frontend_message()
     #
-    # Sends a message to the frontend
+    # Format a message from the frontend for logging purposes. This
+    # will prepend a time code and add other information to help
+    # determine what happened.
     #
     # Args:
-    #    plugin (Plugin): The plugin to send a message for
-    #    message_type (MessageType): The type of message to send
-    #    message (str): The message
-    #    kwargs: Remaining Message() constructor arguments
+    #    message (Message) - The message to create a text from.
+    #    name (str) - A name for the executing context.
     #
-    def _message(self, plugin, message_type, message, **kwargs):
-        args = dict(kwargs)
-        args['scheduler'] = True
-        self._scheduler.context.message(
-            Message(plugin._get_unique_id(),
-                    message_type,
-                    message,
-                    **args))
+    # Returns:
+    #    (str) The text to log.
+    #
+    def _format_frontend_message(self, message, name):
+        INDENT = "    "
+        EMPTYTIME = "--:--:--"
+        template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+
+        detail = ''
+        if message.detail is not None:
+            template += "\n\n{detail}"
+            detail = message.detail.rstrip('\n')
+            detail = INDENT + INDENT.join(detail.splitlines(True))
+
+        timecode = EMPTYTIME
+        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+            minutes, seconds = divmod(remainder, 60)
+            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+        return template.format(timecode=timecode,
+                               type=message.message_type.upper(),
+                               name=name,
+                               message=message.message,
+                               detail=detail)
 
     # _child_action()
     #
@@ -265,7 +376,7 @@
     #
     def _child_action(self, queue):
 
-        element = self.element
+        logfile = self._logfile
 
         # This avoids some SIGTSTP signals from grandchildren
         # getting propagated up to the master process
@@ -301,35 +412,24 @@
         # Time, log and and run the action function
         #
         with _signals.suspendable(stop_time, resume_time), \
-            element._logging_enabled(self.action_name) as filename:
-
-            self._message(element, MessageType.START, self.action_name, logfile=filename)
-
-            # Print the element's environment at the beginning of any element's log file.
-            #
-            # This should probably be omitted for non-build tasks but it's harmless here
-            elt_env = element.get_environment()
-            env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
-            self._message(element, MessageType.LOG,
-                          "Build environment for element {}".format(element.name),
-                          detail=env_dump, logfile=filename)
+            self._child_logging_enabled(logfile) as filename:
 
             try:
                 # Try the task action
-                result = self._action_cb(element)
+                result = self._child_process()
             except BstError as e:
                 elapsed = datetime.datetime.now() - starttime
 
                 if self._tries <= self._max_retries:
-                    self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries),
+                    self._message(MessageType.FAIL,
+                                  "Try #{} failed, retrying".format(self._tries),
                                   elapsed=elapsed)
                 else:
-                    self._message(element, MessageType.FAIL, str(e),
+                    self._message(MessageType.FAIL, str(e),
                                   elapsed=elapsed, detail=e.detail,
                                   logfile=filename, sandbox=e.sandbox)
 
-                # Report changes in the workspace, even if there was a handled failure
-                self._child_send_workspace()
+                self._queue.put(Envelope('child_data', self._child_process_data()))
 
                 # Report the exception to the parent (for internal testing purposes)
                 self._child_send_error(e)
@@ -343,18 +443,19 @@
                 #
                 elapsed = datetime.datetime.now() - starttime
                 detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
-                self._message(element, MessageType.BUG, self.action_name,
+
+                self._message(MessageType.BUG, self.action_name,
                               elapsed=elapsed, detail=detail,
                               logfile=filename)
                 self._child_shutdown(1)
 
             else:
                 # No exception occurred in the action
-                self._child_send_workspace()
+                self._queue.put(Envelope('child_data', self._child_process_data()))
                 self._child_send_result(result)
 
                 elapsed = datetime.datetime.now() - starttime
-                self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+                self._message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
                               logfile=filename)
 
                 # Shutdown needs to stay outside of the above context manager,
@@ -398,16 +499,6 @@
             envelope = Envelope('result', result)
             self._queue.put(envelope)
 
-    # _child_send_workspace()
-    #
-    # Sends the serialized workspace through the message queue, if any
-    #
-    def _child_send_workspace(self):
-        workspace = self.element._get_workspace()
-        if workspace:
-            envelope = Envelope('workspace', workspace.to_dict())
-            self._queue.put(envelope)
-
     # _child_shutdown()
     #
     # Shuts down the child process by cleaning up and exiting the process
@@ -419,44 +510,6 @@
         self._queue.close()
         sys.exit(exit_code)
 
-    # _child_log()
-    #
-    # Logs a Message to the process's dedicated log file
-    #
-    # Args:
-    #    plugin (Plugin): The plugin to log for
-    #    message (Message): The message to log
-    #
-    def _child_log(self, plugin, message):
-
-        with plugin._output_file() as output:
-            INDENT = "    "
-            EMPTYTIME = "--:--:--"
-
-            name = '[' + plugin.name + ']'
-
-            fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
-            detail = ''
-            if message.detail is not None:
-                fmt += "\n\n{detail}"
-                detail = message.detail.rstrip('\n')
-                detail = INDENT + INDENT.join(detail.splitlines(True))
-
-            timecode = EMPTYTIME
-            if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
-                hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60)
-                minutes, seconds = divmod(remainder, 60)
-                timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
-            message_text = fmt.format(timecode=timecode,
-                                      type=message.message_type.upper(),
-                                      name=name,
-                                      message=message.message,
-                                      detail=detail)
-
-            output.write('{}\n'.format(message_text))
-            output.flush()
-
     # _child_message_handler()
     #
     # A Context delegate for handling messages, this replaces the
@@ -470,16 +523,8 @@
     #
     def _child_message_handler(self, message, context):
 
-        # Tag them on the way out the door...
-        message.action_name = self.action_name
-        message.task_id = self.element._get_unique_id()
-
-        # Use the plugin for the task for the output, not a plugin
-        # which might be acting on behalf of the task
-        plugin = _plugin_lookup(message.task_id)
-
         # Log first
-        self._child_log(plugin, message)
+        message = self._child_log(message)
 
         if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
             # Job will be retried, display failures as warnings in the frontend
@@ -519,7 +564,8 @@
             self.spawn()
             return
 
-        self._complete_cb(self, self.element, returncode == 0, self._result)
+        self._parent_complete(returncode == 0, self._result)
+        self._scheduler.job_completed(self, returncode == 0)
 
     # _parent_process_envelope()
     #
@@ -536,21 +582,22 @@
         if not self._listening:
             return
 
-        if envelope.message_type == 'message':
+        if envelope._message_type == 'message':
             # Propagate received messages from children
             # back through the context.
-            self._scheduler.context.message(envelope.message)
-        elif envelope.message_type == 'error':
+            self._scheduler.context.message(envelope._message)
+        elif envelope._message_type == 'error':
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
             # is currently managed in _exceptions.py
-            set_last_task_error(envelope.message['domain'],
-                                envelope.message['reason'])
-        elif envelope.message_type == 'result':
+            set_last_task_error(envelope._message['domain'],
+                                envelope._message['reason'])
+        elif envelope._message_type == 'result':
             assert self._result is None
-            self._result = envelope.message
-        elif envelope.message_type == 'workspace':
-            self.workspace_dict = envelope.message
+            self._result = envelope._message
+        elif envelope._message_type == 'child_data':
+            # If we retry a job, we assign a new value to this
+            self.child_data = envelope._message
         else:
             raise Exception()
 
diff --git a/buildstream/_scheduler/queues/__init__.py b/buildstream/_scheduler/queues/__init__.py
index b9acef1..3b22939 100644
--- a/buildstream/_scheduler/queues/__init__.py
+++ b/buildstream/_scheduler/queues/__init__.py
@@ -1 +1 @@
-from .queue import Queue, QueueStatus, QueueType
+from .queue import Queue, QueueStatus
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 50ba312..7f8ac9e 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -18,7 +18,8 @@
 #        Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
 #        Jürg Billeter <juerg.billeter@codethink.co.uk>
 
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which assembles elements
@@ -27,7 +28,7 @@
 
     action_name = "Build"
     complete_name = "Built"
-    queue_type = QueueType.BUILD
+    resources = [ResourceType.PROCESS]
 
     def process(self, element):
         element._assemble()
@@ -50,7 +51,7 @@
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, job, element, result, success):
 
         if success:
             # Inform element in main process that assembly is done
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index bdff156..265890b 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -22,7 +22,8 @@
 from ... import Consistency
 
 # Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which fetches element sources
@@ -31,7 +32,7 @@
 
     action_name = "Fetch"
     complete_name = "Fetched"
-    queue_type = QueueType.FETCH
+    resources = [ResourceType.DOWNLOAD]
 
     def __init__(self, scheduler, skip_cached=False):
         super().__init__(scheduler)
@@ -66,7 +67,7 @@
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index b4f5b0d..efaa59e 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -19,7 +19,8 @@
 #        Jürg Billeter <juerg.billeter@codethink.co.uk>
 
 # Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which pulls element artifacts
@@ -28,7 +29,7 @@
 
     action_name = "Pull"
     complete_name = "Pulled"
-    queue_type = QueueType.FETCH
+    resources = [ResourceType.UPLOAD]
 
     def process(self, element):
         # returns whether an artifact was downloaded or not
@@ -51,7 +52,7 @@
         else:
             return QueueStatus.SKIP
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 624eefd..568e053 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -19,7 +19,8 @@
 #        Jürg Billeter <juerg.billeter@codethink.co.uk>
 
 # Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which pushes element artifacts
@@ -28,7 +29,7 @@
 
     action_name = "Push"
     complete_name = "Pushed"
-    queue_type = QueueType.PUSH
+    resources = [ResourceType.UPLOAD]
 
     def process(self, element):
         # returns whether an artifact was uploaded or not
@@ -40,7 +41,7 @@
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index d0c4828..8ca3ac0 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -19,32 +19,20 @@
 #        Jürg Billeter <juerg.billeter@codethink.co.uk>
 
 # System imports
+import os
 from collections import deque
 from enum import Enum
 import traceback
 
 # Local imports
-from ..job import Job
+from ..jobs import ElementJob
+from ..resources import ResourceType
 
 # BuildStream toplevel imports
 from ..._exceptions import BstError, set_last_task_error
 from ..._message import Message, MessageType
 
 
-# Indicates the kind of activity
-#
-#
-class QueueType():
-    # Tasks which download stuff from the internet
-    FETCH = 1
-
-    # CPU/Disk intensive tasks
-    BUILD = 2
-
-    # Tasks which upload stuff to the internet
-    PUSH = 3
-
-
 # Queue status for a given element
 #
 #
@@ -69,14 +57,13 @@
     # These should be overridden on class data of of concrete Queue implementations
     action_name = None
     complete_name = None
-    queue_type = None
+    resources = []                     # Resources this queues' jobs want
 
     def __init__(self, scheduler):
 
         #
         # Public members
         #
-        self.active_jobs = []          # List of active ongoing Jobs, for scheduler observation
         self.failed_elements = []      # List of failed elements, for the frontend
         self.processed_elements = []   # List of processed elements, for the frontend
         self.skipped_elements = []     # List of skipped elements, for the frontend
@@ -88,13 +75,13 @@
         self._wait_queue = deque()
         self._done_queue = deque()
         self._max_retries = 0
-        if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH:
-            self._max_retries = scheduler.context.sched_network_retries
 
         # Assert the subclass has setup class data
         assert self.action_name is not None
         assert self.complete_name is not None
-        assert self.queue_type is not None
+
+        if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources:
+            self._max_retries = scheduler.context.sched_network_retries
 
     #####################################################
     #     Abstract Methods for Queue implementations    #
@@ -143,6 +130,7 @@
     # Abstract method for handling a successful job completion.
     #
     # Args:
+    #    job (Job): The job which completed processing
     #    element (Element): The element which completed processing
     #    result (any): The return value of the process() implementation
     #    success (bool): True if the process() implementation did not
@@ -152,7 +140,7 @@
     #    (bool): True if the element should appear to be processsed,
     #            Otherwise False will count the element as "skipped"
     #
-    def done(self, element, result, success):
+    def done(self, job, element, result, success):
         pass
 
     #####################################################
@@ -170,10 +158,22 @@
         if not elts:
             return
 
+        # Note: The internal lists work with jobs. This is not
+        #       reflected in any external methods (except
+        #       pop/peek_ready_jobs).
+        def create_job(element):
+            logfile = self._element_log_path(element)
+            return ElementJob(self._scheduler, self.action_name,
+                              logfile, element=element, queue=self,
+                              resources=self.resources,
+                              action_cb=self.process,
+                              complete_cb=self._job_done,
+                              max_retries=self._max_retries)
+
         # Place skipped elements directly on the done queue
-        elts = list(elts)
-        skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
-        wait = [elt for elt in elts if elt not in skip]
+        jobs = [create_job(elt) for elt in elts]
+        skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
+        wait = [job for job in jobs if job not in skip]
 
         self._wait_queue.extend(wait)
         self._done_queue.extend(skip)
@@ -189,7 +189,7 @@
     #
     def dequeue(self):
         while self._done_queue:
-            yield self._done_queue.popleft()
+            yield self._done_queue.popleft().element
 
     # dequeue_ready()
     #
@@ -201,7 +201,10 @@
     def dequeue_ready(self):
         return any(self._done_queue)
 
-    # process_ready()
+    # pop_ready_jobs()
+    #
+    # Returns:
+    #     ([Job]): A list of jobs to run
     #
     # Process elements in the queue, moving elements which were enqueued
     # into the dequeue pool, and processing them if necessary.
@@ -211,46 +214,45 @@
     #
     #   o Elements which are QueueStatus.WAIT will not be effected
     #
-    #   o Elements which are QueueStatus.READY will be processed
-    #     and added to the Queue.active_jobs list as a result,
-    #     given that the scheduler allows the Queue enough tokens
-    #     for the given queue's job type
-    #
     #   o Elements which are QueueStatus.SKIP will move directly
     #     to the dequeue pool
     #
-    def process_ready(self):
-        scheduler = self._scheduler
+    #   o For Elements which are QueueStatus.READY a Job will be
+    #     created and returned to the caller, given that the scheduler
+    #     allows the Queue enough resources for the given job
+    #
+    def pop_ready_jobs(self):
         unready = []
+        ready = []
 
-        while self._wait_queue and scheduler.get_job_token(self.queue_type):
-            element = self._wait_queue.popleft()
+        while self._wait_queue:
+            job = self._wait_queue.popleft()
+            element = job.element
 
             status = self.status(element)
             if status == QueueStatus.WAIT:
-                scheduler.put_job_token(self.queue_type)
-                unready.append(element)
+                unready.append(job)
                 continue
             elif status == QueueStatus.SKIP:
-                scheduler.put_job_token(self.queue_type)
-                self._done_queue.append(element)
+                self._done_queue.append(job)
                 self.skipped_elements.append(element)
                 continue
 
             self.prepare(element)
-
-            job = Job(scheduler, element, self.action_name,
-                      self.process, self._job_done,
-                      max_retries=self._max_retries)
-            scheduler.job_starting(job)
-
-            job.spawn()
-            self.active_jobs.append(job)
+            ready.append(job)
 
         # These were not ready but were in the beginning, give em
         # first priority again next time around
         self._wait_queue.extendleft(unready)
 
+        return ready
+
+    def peek_ready_jobs(self):
+        def ready(job):
+            return self.status(job.element) == QueueStatus.READY
+
+        yield from (job for job in self._wait_queue if ready(job))
+
     #####################################################
     #                 Private Methods                   #
     #####################################################
@@ -265,12 +267,16 @@
     #    job (Job): The job which completed
     #
     def _update_workspaces(self, element, job):
+        workspace_dict = None
+        if job.child_data:
+            workspace_dict = job.child_data.get('workspace', None)
+
         # Handle any workspace modifications now
         #
-        if job.workspace_dict:
+        if workspace_dict:
             context = element._get_context()
             workspaces = context.get_workspaces()
-            if workspaces.update_workspace(element._get_full_name(), job.workspace_dict):
+            if workspaces.update_workspace(element._get_full_name(), workspace_dict):
                 try:
                     workspaces.save_config()
                 except BstError as e:
@@ -291,17 +297,15 @@
     #
     def _job_done(self, job, element, success, result):
 
-        # Remove from our jobs
-        self.active_jobs.remove(job)
-
-        # Update workspaces in the main task before calling any queue implementation
+        # Update values that need to be synchronized in the main task
+        # before calling any queue implementation
         self._update_workspaces(element, job)
 
         # Give the result of the job to the Queue implementor,
         # and determine if it should be considered as processed
         # or skipped.
         try:
-            processed = self.done(element, result, success)
+            processed = self.done(job, element, result, success)
 
         except BstError as e:
 
@@ -330,7 +334,7 @@
             # No exception occured, handle the success/failure state in the normal way
             #
             if success:
-                self._done_queue.append(element)
+                self._done_queue.append(job)
                 if processed:
                     self.processed_elements.append(element)
                 else:
@@ -338,18 +342,22 @@
             else:
                 self.failed_elements.append(element)
 
-        # Give the token for this job back to the scheduler
-        # immediately before invoking another round of scheduling
-        self._scheduler.put_job_token(self.queue_type)
-
-        # Notify frontend
-        self._scheduler.job_completed(self, job, success)
-
-        self._scheduler.sched()
-
     # Convenience wrapper for Queue implementations to send
     # a message for the element they are processing
     def _message(self, element, message_type, brief, **kwargs):
         context = element._get_context()
         message = Message(element._get_unique_id(), message_type, brief, **kwargs)
         context.message(message)
+
+    def _element_log_path(self, element):
+        project = element._get_project()
+        context = element._get_context()
+
+        key = element._get_display_key()[1]
+        action = self.action_name.lower()
+        logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action)
+
+        directory = os.path.join(context.logdir, project.name, element.normal_name)
+
+        os.makedirs(directory, exist_ok=True)
+        return os.path.join(directory, logfile)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index 3a65f01..c7a8f4c 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -23,7 +23,8 @@
 from ... import SourceError
 
 # Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which tracks sources
@@ -32,7 +33,7 @@
 
     action_name = "Track"
     complete_name = "Tracked"
-    queue_type = QueueType.FETCH
+    resources = [ResourceType.DOWNLOAD]
 
     def process(self, element):
         return element._track()
@@ -47,7 +48,7 @@
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py
new file mode 100644
index 0000000..bbf851b
--- /dev/null
+++ b/buildstream/_scheduler/resources.py
@@ -0,0 +1,105 @@
+class ResourceType():
+    CACHE = 0
+    DOWNLOAD = 1
+    PROCESS = 2
+    UPLOAD = 3
+
+
+class Resources():
+    def __init__(self, num_builders, num_fetchers, num_pushers):
+        self._max_resources = {
+            ResourceType.CACHE: 1,
+            ResourceType.DOWNLOAD: num_fetchers,
+            ResourceType.PROCESS: num_builders,
+            ResourceType.UPLOAD: num_pushers
+        }
+
+        # Resources jobs are currently using.
+        self._used_resources = {
+            ResourceType.CACHE: 0,
+            ResourceType.DOWNLOAD: 0,
+            ResourceType.PROCESS: 0,
+            ResourceType.UPLOAD: 0
+        }
+
+        # Resources jobs currently want exclusive access to. The set
+        # of jobs that have asked for exclusive access is the value -
+        # this is so that we can avoid scheduling any other jobs until
+        # *all* exclusive jobs that "register interest" have finished
+        # - which avoids starving them of scheduling time.
+        self._exclusive_resources = {
+            ResourceType.CACHE: set(),
+            ResourceType.DOWNLOAD: set(),
+            ResourceType.PROCESS: set(),
+            ResourceType.UPLOAD: set()
+        }
+
+    def clear_job_resources(self, job):
+        for resource in job.exclusive_resources:
+            self._exclusive_resources[resource].remove(hash(job))
+
+        for resource in job.resources:
+            self._used_resources[resource] -= 1
+
+    def reserve_exclusive_resources(self, job):
+        exclusive = job.exclusive_resources
+
+        # The very first thing we do is to register any exclusive
+        # resources this job may want. Even if the job is not yet
+        # allowed to run (because another job is holding the resource
+        # it wants), we can still set this - it just means that any
+        # job *currently* using these resources has to finish first,
+        # and no new jobs wanting these can be launched (except other
+        # exclusive-access jobs).
+        #
+        for resource in exclusive:
+            self._exclusive_resources[resource].add(hash(job))
+
+    def reserve_job_resources(self, job):
+        # First, we check if the job wants to access a resource that
+        # another job wants exclusive access to. If so, it cannot be
+        # scheduled.
+        #
+        # Note that if *both* jobs want this exclusively, we don't
+        # fail yet.
+        #
+        # FIXME: I *think* we can deadlock if two jobs want disjoint
+        #        sets of exclusive and non-exclusive resources. This
+        #        is currently not possible, but may be worth thinking
+        #        about.
+        #
+        for resource in job.resources - job.exclusive_resources:
+            # If our job wants this resource exclusively, we never
+            # check this, so we can get away with not (temporarily)
+            # removing it from the set.
+            if self._exclusive_resources[resource]:
+                return False
+
+        # Now we check if anything is currently using any resources
+        # this job wants exclusively. If so, the job cannot be
+        # scheduled.
+        #
+        # Since jobs that use a resource exclusively are also using
+        # it, this means only one exclusive job can ever be scheduled
+        # at a time, despite being allowed to be part of the exclusive
+        # set.
+        #
+        for exclusive in job.exclusive_resources:
+            if self._used_resources[exclusive] != 0:
+                return False
+
+        # Finally, we check if we have enough of each resource
+        # available. If we don't have enough, the job cannot be
+        # scheduled.
+        for resource in job.resources:
+            if (self._max_resources[resource] > 0 and
+                    self._used_resources[resource] >= self._max_resources[resource]):
+                return False
+
+        # Now we register the fact that our job is using the resources
+        # it asked for, and tell the scheduler that it is allowed to
+        # continue.
+        for resource in job.resources:
+            self._used_resources[resource] += 1
+
+        return True
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 7bfbc95..bc182db 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -21,12 +21,13 @@
 # System imports
 import os
 import asyncio
+from itertools import chain
 import signal
 import datetime
 from contextlib import contextmanager
 
 # Local imports
-from .queues import QueueType
+from .resources import Resources
 
 
 # A decent return code for Scheduler.run()
@@ -69,6 +70,8 @@
         #
         # Public members
         #
+        self.active_jobs = []       # Jobs currently being run in the scheduler
+        self.waiting_jobs = []      # Jobs waiting for resources
         self.queues = None          # Exposed for the frontend to print summaries
         self.context = context      # The Context object shared with Queues
         self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
@@ -90,13 +93,9 @@
         self._suspendtime = None
         self._queue_jobs = True      # Whether we should continue to queue jobs
 
-        # Initialize task tokens with the number allowed by
-        # the user configuration
-        self._job_tokens = {
-            QueueType.FETCH: context.sched_fetchers,
-            QueueType.BUILD: context.sched_builders,
-            QueueType.PUSH: context.sched_pushers
-        }
+        self._resources = Resources(context.sched_builders,
+                                    context.sched_fetchers,
+                                    context.sched_pushers)
 
     # run()
     #
@@ -129,7 +128,7 @@
         self._connect_signals()
 
         # Run the queues
-        self.sched()
+        self._schedule_queue_jobs()
         self.loop.run_forever()
         self.loop.close()
 
@@ -209,18 +208,74 @@
             starttime = timenow
         return timenow - starttime
 
-    # sched()
+    # schedule_jobs()
+    #
+    # Args:
+    #     jobs ([Job]): A list of jobs to schedule
+    #
+    # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
+    # run as soon any other queueing jobs finish, provided sufficient
+    # resources are available for them to run
+    #
+    def schedule_jobs(self, jobs):
+        for job in jobs:
+            self.waiting_jobs.append(job)
+
+    # job_completed():
+    #
+    # Called when a Job completes
+    #
+    # Args:
+    #    queue (Queue): The Queue holding a complete job
+    #    job (Job): The completed Job
+    #    success (bool): Whether the Job completed with a success status
+    #
+    def job_completed(self, job, success):
+        self._resources.clear_job_resources(job)
+        self.active_jobs.remove(job)
+        self._job_complete_callback(job, success)
+        self._schedule_queue_jobs()
+        self._sched()
+
+    #######################################################
+    #                  Local Private Methods              #
+    #######################################################
+
+    # _sched()
     #
     # The main driving function of the scheduler, it will be called
-    # automatically when Scheduler.run() is called initially, and needs
-    # to be called whenever a job can potentially be scheduled, usually
-    # when a Queue completes handling of a job.
+    # automatically when Scheduler.run() is called initially,
     #
-    # This will process the Queues and pull elements through the Queues
+    def _sched(self):
+        for job in self.waiting_jobs:
+            self._resources.reserve_exclusive_resources(job)
+
+        for job in self.waiting_jobs:
+            if not self._resources.reserve_job_resources(job):
+                continue
+
+            job.spawn()
+            self.waiting_jobs.remove(job)
+            self.active_jobs.append(job)
+
+            if self._job_start_callback:
+                self._job_start_callback(job)
+
+        # If nothings ticking, time to bail out
+        if not self.active_jobs and not self.waiting_jobs:
+            self.loop.stop()
+
+    # _schedule_queue_jobs()
+    #
+    # Ask the queues what jobs they want to schedule and schedule
+    # them. This is done here so we can ask for new jobs when jobs
+    # from previous queues become available.
+    #
+    # This will process the Queues, pull elements through the Queues
     # and process anything that is ready.
     #
-    def sched(self):
-
+    def _schedule_queue_jobs(self):
+        ready = []
         process_queues = True
 
         while self._queue_jobs and process_queues:
@@ -233,90 +288,29 @@
 
                 # Dequeue processed elements for the next queue
                 elements = list(queue.dequeue())
-                elements = list(elements)
 
             # Kickoff whatever processes can be processed at this time
             #
-            # We start by queuing from the last queue first, because we want to
-            # give priority to queues later in the scheduling process in the case
-            # that multiple queues share the same token type.
+            # We start by queuing from the last queue first, because
+            # we want to give priority to queues later in the
+            # scheduling process in the case that multiple queues
+            # share the same token type.
             #
-            # This avoids starvation situations where we dont move on to fetch
-            # tasks for elements which failed to pull, and thus need all the pulls
-            # to complete before ever starting a build
-            for queue in reversed(self.queues):
-                queue.process_ready()
+            # This avoids starvation situations where we dont move on
+            # to fetch tasks for elements which failed to pull, and
+            # thus need all the pulls to complete before ever starting
+            # a build
+            ready.extend(chain.from_iterable(
+                queue.pop_ready_jobs() for queue in reversed(self.queues)
+            ))
 
-            # process_ready() may have skipped jobs, adding them to the done_queue.
-            # Pull these skipped elements forward to the next queue and process them.
+            # pop_ready_jobs() may have skipped jobs, adding them to
+            # the done_queue.  Pull these skipped elements forward to
+            # the next queue and process them.
             process_queues = any(q.dequeue_ready() for q in self.queues)
 
-        # If nothings ticking, time to bail out
-        ticking = 0
-        for queue in self.queues:
-            ticking += len(queue.active_jobs)
-
-        if ticking == 0:
-            self.loop.stop()
-
-    # get_job_token():
-    #
-    # Used by the Queue object to obtain a token for
-    # processing a Job, if a Queue does not receive a token
-    # then it must wait until a later time in order to
-    # process pending jobs.
-    #
-    # Args:
-    #    queue_type (QueueType): The type of token to obtain
-    #
-    # Returns:
-    #    (bool): Whether a token was handed out or not
-    #
-    def get_job_token(self, queue_type):
-        if self._job_tokens[queue_type] > 0:
-            self._job_tokens[queue_type] -= 1
-            return True
-        return False
-
-    # put_job_token():
-    #
-    # Return a job token to the scheduler. Tokens previously
-    # received with get_job_token() must be returned to
-    # the scheduler once the associated job is complete.
-    #
-    # Args:
-    #    queue_type (QueueType): The type of token to obtain
-    #
-    def put_job_token(self, queue_type):
-        self._job_tokens[queue_type] += 1
-
-    # job_starting():
-    #
-    # Called by the Queue when starting a Job
-    #
-    # Args:
-    #    job (Job): The starting Job
-    #
-    def job_starting(self, job):
-        if self._job_start_callback:
-            self._job_start_callback(job.element, job.action_name)
-
-    # job_completed():
-    #
-    # Called by the Queue when a Job completes
-    #
-    # Args:
-    #    queue (Queue): The Queue holding a complete job
-    #    job (Job): The completed Job
-    #    success (bool): Whether the Job completed with a success status
-    #
-    def job_completed(self, queue, job, success):
-        if self._job_complete_callback:
-            self._job_complete_callback(job.element, queue, job.action_name, success)
-
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
+        self.schedule_jobs(ready)
+        self._sched()
 
     # _suspend_jobs()
     #
@@ -326,9 +320,8 @@
         if not self.suspended:
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.suspend()
+            for job in self.active_jobs:
+                job.suspend()
 
     # _resume_jobs()
     #
@@ -336,9 +329,8 @@
     #
     def _resume_jobs(self):
         if self.suspended:
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.resume()
+            for job in self.active_jobs:
+                job.resume()
             self.suspended = False
             self._starttime += (datetime.datetime.now() - self._suspendtime)
             self._suspendtime = None
@@ -401,19 +393,18 @@
         wait_limit = 20.0
 
         # First tell all jobs to terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                job.terminate()
+        for job in self.active_jobs:
+            job.terminate()
 
         # Now wait for them to really terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                elapsed = datetime.datetime.now() - wait_start
-                timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
-                if not job.terminate_wait(timeout):
-                    job.kill()
+        for job in self.active_jobs:
+            elapsed = datetime.datetime.now() - wait_start
+            timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+            if not job.terminate_wait(timeout):
+                job.kill()
 
-        self.loop.stop()
+        # Clear out the waiting jobs
+        self.waiting_jobs = []
 
     # Regular timeout for driving status in the UI
     def _tick(self):