WIP: use priority queue in schedular
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
index d46fd4c..17ccb0a 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -39,3 +39,6 @@
def child_process_data(self):
return {}
+
+ def key(self):
+ return (100, 'cache-size')
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index 8bdbba0..3e860e2 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -32,3 +32,6 @@
def parent_complete(self, success, result):
if success:
self._artifacts.set_cache_size(result)
+
+ def key(self):
+ return (0, 'cleanup')
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 8ce5c06..1d6305c 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -22,6 +22,13 @@
from .job import Job
+_ACTIONS = {
+ "Build": 10,
+ "Fetch": 20,
+ "Pull": 30,
+ "Push": 40,
+ "Track": 50,
+}
# ElementJob()
#
@@ -113,3 +120,6 @@
data['workspace'] = workspace.to_dict()
return data
+
+ def key(self):
+ return (_ACTIONS.get(self.action_name, 100), self._element.name)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 60ae0d0..ff0ea83 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -353,6 +353,10 @@
def child_process_data(self):
return {}
+ def key(self):
+ raise ImplError("Job '{kind}' does not implement key()"
+ .format(kind=type(self).__name__))
+
#######################################################
# Local Private Methods #
#######################################################
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index b76c730..6b849f3 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -25,6 +25,7 @@
import signal
import datetime
from contextlib import contextmanager
+from sortedcontainers import SortedList
# Local imports
from .resources import Resources, ResourceType
@@ -72,7 +73,7 @@
# Public members
#
self.active_jobs = [] # Jobs currently being run in the scheduler
- self.waiting_jobs = [] # Jobs waiting for resources
+ self.waiting_jobs = SortedList([], key=lambda job: job.key()) # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -222,7 +223,7 @@
#
def schedule_jobs(self, jobs):
for job in jobs:
- self.waiting_jobs.append(job)
+ self.waiting_jobs.add(job)
# job_completed():
#
diff --git a/setup.py b/setup.py
index 76610f0..95d9204 100755
--- a/setup.py
+++ b/setup.py
@@ -343,6 +343,7 @@
'jinja2 >= 2.10',
'protobuf >= 3.5',
'grpcio >= 1.10',
+ 'sortedcontainers >= 1.5.7',
],
entry_points=bst_install_entry_points,
tests_require=dev_requires,