switching to queue
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index e2a180c..77eca25 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@
import signal
import datetime
from contextlib import contextmanager
+import time
# Local imports
from .resources import Resources, ResourceType
@@ -95,7 +96,7 @@
class Scheduler():
def __init__(self, context,
- start_time, state, notification_handler,
+ start_time, state, notification_queue, notifier,
interrupt_callback=None,
ticker_callback=None):
@@ -126,8 +127,9 @@
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callback to send notifications to report back to the Scheduler's owner
- self.notify = notification_handler
+ # Message to send notifications back to the Scheduler's owner
+ self._notification_queue = notification_queue
+ self._notifier = notifier
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -294,7 +296,7 @@
job_action=job.action_name,
job_status=status,
element=element)
- self.notify(notification)
+ self._notify(notification)
self._sched()
# check_cache_size():
@@ -355,7 +357,7 @@
full_name=job.name,
job_action=job.action_name,
elapsed_time=self.elapsed_time())
- self.notify(notification)
+ self._notify(notification)
job.start()
# Callback for the cache size job
@@ -575,7 +577,7 @@
return
notification = Notification(NotificationType.INTERRUPT)
- self.notify(notification)
+ self._notify(notification)
# _terminate_event():
#
@@ -635,9 +637,19 @@
# Regular timeout for driving status in the UI
def _tick(self):
notification = Notification(NotificationType.TICK)
- self.notify(notification)
+ self._notify(notification)
self.loop.call_later(1, self._tick)
+ def _notify(self, notification):
+ self._notification_queue.put(notification)
+ x = 0
+ while self._notification_queue.empty():
+ time.sleep(0.1)
+ x = x +1
+ if x == 10:
+ raise ValueError("queue still empty")
+ self._notifier()
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c97cf28..ee5a48b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -28,8 +28,10 @@
import shutil
import tarfile
import tempfile
+import multiprocessing as mp
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
+import queue
from ._artifact import Artifact
from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -80,10 +82,12 @@
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
+ self._notification_queue = mp.Queue()
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
+ self._scheduler_notification_handler,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback)
self._first_non_track_queue = None
@@ -1586,7 +1590,16 @@
return element_targets, artifact_refs
- def _scheduler_notification_handler(self, notification):
+ def _scheduler_notification_handler(self):
+ # Check the queue is there and a scheduler is running
+ assert self._notification_queue
+ notification = None
+ #while notification is None:
+ #try:
+ notification = self._notification_queue.get_nowait()
+ #except queue.Empty:
+ # pass
+
if notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
elif notification.notification_type == NotificationType.TICK: