Merge branch 'aevri/spawn' into 'master'

Support the 'spawn' method of starting processes

See merge request BuildStream/buildstream!1511
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index c188720..2bdaab0 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -189,6 +189,12 @@
     SOURCE_CACHE_SERVICE: http://docker:50052
     PYTEST_ARGS: "--color=yes --remote-execution"
 
+tests-spawn-multiprocessing-start-method:
+  image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095
+  <<: *tests
+  variables:
+    BST_FORCE_START_METHOD: "spawn"
+
 
 # Lint separately from testing
 lint:
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index 5a585b2..1365ced 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -1,3 +1,4 @@
+import multiprocessing
 import os
 import sys
 from functools import partial
@@ -217,6 +218,18 @@
                        .format(stream.name), err=True)
             sys.exit(-1)
 
+    # We can only set the global multiprocessing start method once; for that
+    # reason we're advised to do it inside the entrypoint, where it is easy to
+    # ensure the code path is only followed once.
+    if 'BST_FORCE_START_METHOD' in os.environ:
+        multiprocessing.set_start_method(os.environ['BST_FORCE_START_METHOD'])
+        print(
+            "BST_FORCE_START_METHOD: multiprocessing start method forced to:",
+            os.environ['BST_FORCE_START_METHOD'],
+            file=sys.stderr,
+            flush=True,
+        )
+
     original_main(self, args=args, prog_name=prog_name, complete_var=None,
                   standalone_mode=standalone_mode, **extra)
 
diff --git a/src/buildstream/_platform/multiprocessing.py b/src/buildstream/_platform/multiprocessing.py
new file mode 100644
index 0000000..c036651
--- /dev/null
+++ b/src/buildstream/_platform/multiprocessing.py
@@ -0,0 +1,108 @@
+#
+#  Copyright (C) 2019 Bloomberg Finance LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import multiprocessing
+
+
+# QueueManager()
+#
+# This abstracts our choice of creating picklable or non-picklable Queues.
+#
+# Note that when choosing the 'spawn' or 'forkserver' methods of starting
+# processes with the `multiprocessing` standard library module, we must use
+# only picklable type as parameters to jobs.
+#
+class QueueManager:
+    def make_queue_wrapper(self):
+        return _PlainQueueWrapper(multiprocessing.Queue())
+
+
+# PicklableQueueManager()
+#
+# A QueueManager that creates pickable types.
+#
+# Note that the requirement of being picklable adds extra runtime burden, as we
+# must create and maintain a `SyncManager` process that will create and manage
+# the real objects.
+#
+class PicklableQueueManager(QueueManager):
+    def __init__(self):
+        super().__init__()
+        self._manager = None
+
+    def make_queue_wrapper(self):
+        # Only SyncManager Queues are picklable, so we must make those. Avoid
+        # creating multiple expensive SyncManagers, by keeping this one around.
+        if self._manager is None:
+            self._manager = multiprocessing.Manager()
+        return _SyncManagerQueueWrapper(self._manager.Queue())
+
+
+# QueueWrapper()
+#
+# This abstracts our choice of using picklable or non-picklable Queues.
+#
+class QueueWrapper:
+    pass
+
+
+class _PlainQueueWrapper(QueueWrapper):
+    def __init__(self, queue):
+        super().__init__()
+        self.queue = queue
+
+    def set_potential_callback_on_queue_event(self, event_loop, callback):
+        # Warning: Platform specific code up ahead
+        #
+        #   The multiprocessing.Queue object does not tell us how
+        #   to receive io events in the receiving process, so we
+        #   need to sneak in and get its file descriptor.
+        #
+        #   The _reader member of the Queue is currently private
+        #   but well known, perhaps it will become public:
+        #
+        #      http://bugs.python.org/issue3831
+        #
+        event_loop.add_reader(self.queue._reader.fileno(), callback)
+
+    def clear_potential_callback_on_queue_event(self, event_loop):
+        event_loop.remove_reader(self.queue._reader.fileno())
+
+    def close(self):
+        self.queue.close()
+
+
+class _SyncManagerQueueWrapper(QueueWrapper):
+    def __init__(self, queue):
+        super().__init__()
+        self.queue = queue
+
+    def set_potential_callback_on_queue_event(self, event_loop, callback):
+        # We can't easily support these callbacks for Queues managed by a
+        # SyncManager, so don't support them for now. In later work we should
+        # be able to support them with threading.
+        pass
+
+    def clear_potential_callback_on_queue_event(self, event_loop):
+        pass
+
+    def close(self):
+        # SyncManager queue proxies do not have a `close()` method, they rely
+        # on a callback on garbage collection to release resources. For our
+        # purposes the queue is invalid after closing, so it's ok to release it
+        # here.
+        self.queue = None
diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py
index d6e0755..faf3d3c 100644
--- a/src/buildstream/_platform/platform.py
+++ b/src/buildstream/_platform/platform.py
@@ -18,6 +18,7 @@
 #  Authors:
 #        Tristan Maat <tristan.maat@codethink.co.uk>
 
+import multiprocessing
 import os
 import platform
 import sys
@@ -27,6 +28,8 @@
 from .._exceptions import PlatformError, ImplError, SandboxError
 from .. import utils
 
+from .multiprocessing import QueueManager, PicklableQueueManager
+
 
 class Platform():
     # Platform()
@@ -172,6 +175,25 @@
         uname_machine = platform.uname().machine
         return Platform.canonicalize_arch(uname_machine)
 
+    def make_queue_manager(self):
+        if self.does_multiprocessing_start_require_pickling():
+            return PicklableQueueManager()
+        else:
+            return QueueManager()
+
+    # does_multiprocessing_start_require_pickling():
+    #
+    # Returns True if the multiprocessing start method will pickle arguments
+    # to new processes.
+    #
+    # Returns:
+    #    (bool): Whether pickling is required or not
+    #
+    def does_multiprocessing_start_require_pickling(self):
+        # Note that if the start method has not been set before now, it will be
+        # set to the platform default by `get_start_method`.
+        return multiprocessing.get_start_method() != 'fork'
+
     ##################################################################
     #                        Sandbox functions                       #
     ##################################################################
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 7cbde2d..2c48837 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -21,6 +21,7 @@
 
 # System imports
 import os
+import pickle
 import sys
 import signal
 import datetime
@@ -86,6 +87,37 @@
     SUBCLASS_CUSTOM_MESSAGE = 5
 
 
+# _do_pickled_child_job()
+#
+# Unpickle the supplied 'pickled' job and call 'child_action' on it.
+#
+# This is expected to be run in a subprocess started from the main process, as
+# such it will fixup any globals to be in the expected state.
+#
+# Args:
+#    pickled     (BytesIO): The pickled job to execute.
+#    *child_args (any)    : Any parameters to be passed to `child_action`.
+#
+def _do_pickled_child_job(pickled, *child_args):
+
+    utils._is_main_process = _not_main_process
+
+    child_job = pickle.load(pickled)
+    return child_job.child_action(*child_args)
+
+
+# _not_main_process()
+#
+# A function to replace `utils._is_main_process` when we're running in a
+# subprocess that was not forked - the inheritance of the main process id will
+# not work in this case.
+#
+# Note that we'll always not be the main process by definition.
+#
+def _not_main_process():
+    return False
+
+
 # Job()
 #
 # The Job object represents a task that will run in parallel to the main
@@ -136,7 +168,7 @@
         # Private members
         #
         self._scheduler = scheduler            # The scheduler
-        self._queue = None                     # A message passing queue
+        self._queue_wrapper = None             # A wrapper of a message passing queue
         self._process = None                   # The Process object
         self._watcher = None                   # Child process watcher
         self._listening = False                # Whether the parent is currently listening
@@ -162,8 +194,7 @@
     # Starts the job.
     #
     def start(self):
-
-        self._queue = multiprocessing.Queue()
+        self._queue_wrapper = self._scheduler.ipc_queue_manager.make_queue_wrapper()
 
         self._tries += 1
         self._parent_start_listening()
@@ -179,12 +210,18 @@
             self._message_element_key
         )
 
-        # Make sure that picklability doesn't break, by exercising it during
-        # our test suite.
-        if self._scheduler.context.is_running_in_test_suite:
-            pickle_child_job(child_job, self._scheduler.context.get_projects())
-
-        self._process = Process(target=child_job.child_action, args=[self._queue])
+        if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
+            pickled = pickle_child_job(
+                child_job, self._scheduler.context.get_projects())
+            self._process = Process(
+                target=_do_pickled_child_job,
+                args=[pickled, self._queue_wrapper],
+            )
+        else:
+            self._process = Process(
+                target=child_job.child_action,
+                args=[self._queue_wrapper],
+            )
 
         # Block signals which are handled in the main process such that
         # the child process does not inherit the parent's state, but the main
@@ -478,7 +515,7 @@
         self._scheduler.job_completed(self, status)
 
         # Force the deletion of the queue and process objects to try and clean up FDs
-        self._queue = self._process = None
+        self._queue_wrapper = self._process = None
 
     # _parent_process_envelope()
     #
@@ -523,8 +560,8 @@
     # in the parent process.
     #
     def _parent_process_queue(self):
-        while not self._queue.empty():
-            envelope = self._queue.get_nowait()
+        while not self._queue_wrapper.queue.empty():
+            envelope = self._queue_wrapper.queue.get_nowait()
             self._parent_process_envelope(envelope)
 
     # _parent_recv()
@@ -540,20 +577,9 @@
     # Starts listening on the message queue
     #
     def _parent_start_listening(self):
-        # Warning: Platform specific code up ahead
-        #
-        #   The multiprocessing.Queue object does not tell us how
-        #   to receive io events in the receiving process, so we
-        #   need to sneak in and get its file descriptor.
-        #
-        #   The _reader member of the Queue is currently private
-        #   but well known, perhaps it will become public:
-        #
-        #      http://bugs.python.org/issue3831
-        #
         if not self._listening:
-            self._scheduler.loop.add_reader(
-                self._queue._reader.fileno(), self._parent_recv)
+            self._queue_wrapper.set_potential_callback_on_queue_event(
+                self._scheduler.loop, self._parent_recv)
             self._listening = True
 
     # _parent_stop_listening()
@@ -562,7 +588,8 @@
     #
     def _parent_stop_listening(self):
         if self._listening:
-            self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+            self._queue_wrapper.clear_potential_callback_on_queue_event(
+                self._scheduler.loop)
             self._listening = False
 
 
@@ -605,7 +632,7 @@
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
-        self._queue = None
+        self._queue_wrapper = None
 
     # message():
     #
@@ -692,7 +719,7 @@
     # Args:
     #    queue (multiprocessing.Queue): The message queue for IPC
     #
-    def child_action(self, queue):
+    def child_action(self, queue_wrapper):
 
         # This avoids some SIGTSTP signals from grandchildren
         # getting propagated up to the master process
@@ -710,7 +737,7 @@
         #
         # Set the global message handler in this child
         # process to forward messages to the parent process
-        self._queue = queue
+        self._queue_wrapper = queue_wrapper
         self._messenger.set_message_handler(self._child_message_handler)
 
         starttime = datetime.datetime.now()
@@ -808,7 +835,7 @@
     #                        instances). This is sent to the parent Job.
     #
     def _send_message(self, message_type, message_data):
-        self._queue.put(_Envelope(message_type, message_data))
+        self._queue_wrapper.queue.put(_Envelope(message_type, message_data))
 
     # _child_send_error()
     #
@@ -854,7 +881,7 @@
     #    exit_code (_ReturnCode): The exit code to exit with
     #
     def _child_shutdown(self, exit_code):
-        self._queue.close()
+        self._queue_wrapper.close()
         assert isinstance(exit_code, _ReturnCode)
         sys.exit(exit_code.value)
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9d7cf5d..398e52e 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -80,6 +80,11 @@
         self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
         self.suspended = False      # Whether the scheduler is currently suspended
 
+        # A manager for creating and monitoring IPC queues, note that this
+        # can't be part of the platform or context as it is not always
+        # picklable.
+        self.ipc_queue_manager = self.context.platform.make_queue_manager()
+
         # These are shared with the Job, but should probably be removed or made private in some way.
         self.loop = None            # Shared for Job access to observe the message queue
         self.internal_stops = 0     # Amount of SIGSTP signals we've introduced, this is shared with job.py
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 2c57925..b78c258 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -52,6 +52,9 @@
 _ALIAS_SEPARATOR = ':'
 _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"]
 
+# Main process pid
+_MAIN_PID = os.getpid()
+
 
 class UtilError(BstError):
     """Raised by utility functions when system calls fail.
@@ -699,17 +702,13 @@
     return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit)
 
 
-# Main process pid
-_main_pid = os.getpid()
-
-
 # _is_main_process()
 #
 # Return whether we are in the main process or not.
 #
 def _is_main_process():
-    assert _main_pid is not None
-    return os.getpid() == _main_pid
+    assert _MAIN_PID is not None
+    return os.getpid() == _MAIN_PID
 
 
 # Recursively remove directories, ignoring file permissions as much as