basic async in stream
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 122ba37..0d06500 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -68,6 +68,7 @@
     START = "start"
     TASK_GROUPS = "task_groups"
     ELEMENT_TOTALS = "element_totals"
+    FINISH = "finish"
 
 
 # Notification()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 74f7755..f0f6138 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -74,6 +74,7 @@
         self.queues = []             # Queue objects
         self.len_session_elements = None
         self.len_total_elements = None
+        self.loop = None
 
         #
         # Private members
@@ -141,26 +142,37 @@
         self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
                                               kwargs=kwargs, name=process_name)
 
+
         self._subprocess.start()
 
-        # TODO connect signal handlers with asyncio
-        while self._subprocess.exitcode is None:
-            # check every given time interval on subprocess state
-            self._subprocess.join(0.01)
-            # if no exit code, go back to checking the message queue
-            self._loop()
+        # We can now launch another async
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.loop)
+        self._start_listening()
+        #raise ValueError("started listening")
+        self.loop.run_forever()
 
+        # Run forever needs to be forcefully stopped, else we never exit the statement
+
+        #raise ValueError("run_forever")
+        # TODO connect signal handlers with asyncio
+        #while self._subprocess.exitcode is None:
+            # check every given time interval on subprocess state
+            #self._subprocess.join(0.01)
+        # Scheduler has stopped running, so safe to still have async here
+        self._stop_listening()
+        #print("closing the loop")
+        #raise ValueError("closing loop")
+        #self.loop.stop()
+        self.loop.close()
+        self.loop = None
         # Set main process back
         utils._reset_main_pid()
 
         # Ensure no more notifcations to process
-        try:
-            while True:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-        except queue.Empty:
-            pass
-
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
 
     # cleanup()
     #
@@ -1456,6 +1468,9 @@
 
         status = self._scheduler.run(self.queues)
 
+        # Scheduler has finished running, send frontend notification
+        self._notify_front(Notification(NotificationType.FINISH))
+
         if status == SchedStatus.ERROR:
             raise StreamError()
         if status == SchedStatus.TERMINATED:
@@ -1774,12 +1789,18 @@
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
+            # If we're looping, stop
+            if self.loop:
+                self.loop.stop()
             # Regenerate the exception here, so we don't have to pickle it
             raise SubprocessException(**notification.exception)
         elif notification.notification_type == NotificationType.START:
             self._session_start_callback()
         elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
             self.len_session_elements, self.len_total_elements = notification.element_totals
+        elif notification.notification_type == NotificationType.FINISH:
+            if self.loop:
+                self.loop.stop()
         else:
             raise StreamError("Unrecognised notification type received")
 
@@ -1797,16 +1818,30 @@
 
     # The code to be run by the Stream's event loop while delegating
     # work to a subprocess with the @subprocessed decorator
-    def _loop(self):
-        assert self._notify_front_queue
+    #def _loop(self):
+        #assert self._notify_front_queue
         # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
+        #while True:
+            #try:
+                #notification = self._notify_front_queue.get_nowait()
+                #self._notification_handler(notification)
+            #except queue.Empty:
+                #notification = None
+                #break
+
+    def _loop(self):
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_front_queue:
+            self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop)
+
+    def _stop_listening(self):
+        if self._notify_front_queue:
+            self.loop.remove_reader(self._notify_front_queue._reader.fileno())
+
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing