Merge branch 'jmac/remote_execution_client' into 'master'

Remote execution client

See merge request BuildStream/buildstream!626
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 9a9f702..ce2b874 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -19,6 +19,7 @@
 
 import hashlib
 import itertools
+import io
 import multiprocessing
 import os
 import signal
@@ -76,6 +77,7 @@
     ################################################
     #     Implementation of abstract methods       #
     ################################################
+
     def contains(self, element, key):
         refpath = self._refpath(self.get_artifact_fullname(element, key))
 
@@ -153,6 +155,7 @@
         q = multiprocessing.Queue()
         for remote_spec in remote_specs:
             # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
             p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
 
             try:
@@ -259,6 +262,25 @@
 
         return False
 
+    def pull_tree(self, project, digest):
+        """ Pull a single Tree rather than an artifact.
+        Does not update local refs. """
+
+        for remote in self._remotes[project]:
+            try:
+                remote.init()
+
+                digest = self._fetch_tree(remote, digest)
+
+                # no need to pull from additional remotes
+                return digest
+
+            except grpc.RpcError as e:
+                if e.code() != grpc.StatusCode.NOT_FOUND:
+                    raise
+
+        return None
+
     def link_key(self, element, oldkey, newkey):
         oldref = self.get_artifact_fullname(element, oldkey)
         newref = self.get_artifact_fullname(element, newkey)
@@ -267,8 +289,46 @@
 
         self.set_ref(newref, tree)
 
+    def _push_refs_to_remote(self, refs, remote):
+        skipped_remote = True
+        try:
+            for ref in refs:
+                tree = self.resolve_ref(ref)
+
+                # Check whether ref is already on the server in which case
+                # there is no need to push the artifact
+                try:
+                    request = buildstream_pb2.GetReferenceRequest()
+                    request.key = ref
+                    response = remote.ref_storage.GetReference(request)
+
+                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
+                        # ref is already on the server with the same tree
+                        continue
+
+                except grpc.RpcError as e:
+                    if e.code() != grpc.StatusCode.NOT_FOUND:
+                        # Intentionally re-raise RpcError for outer except block.
+                        raise
+
+                self._send_directory(remote, tree)
+
+                request = buildstream_pb2.UpdateReferenceRequest()
+                request.keys.append(ref)
+                request.digest.hash = tree.hash
+                request.digest.size_bytes = tree.size_bytes
+                remote.ref_storage.UpdateReference(request)
+
+                skipped_remote = False
+        except grpc.RpcError as e:
+            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
+
+        return not skipped_remote
+
     def push(self, element, keys):
-        refs = [self.get_artifact_fullname(element, key) for key in keys]
+
+        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
 
         project = element._get_project()
 
@@ -278,95 +338,77 @@
 
         for remote in push_remotes:
             remote.init()
-            skipped_remote = True
+
             element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
 
-            try:
-                for ref in refs:
-                    tree = self.resolve_ref(ref)
-
-                    # Check whether ref is already on the server in which case
-                    # there is no need to push the artifact
-                    try:
-                        request = buildstream_pb2.GetReferenceRequest()
-                        request.key = ref
-                        response = remote.ref_storage.GetReference(request)
-
-                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
-                            # ref is already on the server with the same tree
-                            continue
-
-                    except grpc.RpcError as e:
-                        if e.code() != grpc.StatusCode.NOT_FOUND:
-                            # Intentionally re-raise RpcError for outer except block.
-                            raise
-
-                    missing_blobs = {}
-                    required_blobs = self._required_blobs(tree)
-
-                    # Limit size of FindMissingBlobs request
-                    for required_blobs_group in _grouper(required_blobs, 512):
-                        request = remote_execution_pb2.FindMissingBlobsRequest()
-
-                        for required_digest in required_blobs_group:
-                            d = request.blob_digests.add()
-                            d.hash = required_digest.hash
-                            d.size_bytes = required_digest.size_bytes
-
-                        response = remote.cas.FindMissingBlobs(request)
-                        for digest in response.missing_blob_digests:
-                            d = remote_execution_pb2.Digest()
-                            d.hash = digest.hash
-                            d.size_bytes = digest.size_bytes
-                            missing_blobs[d.hash] = d
-
-                    # Upload any blobs missing on the server
-                    skipped_remote = False
-                    for digest in missing_blobs.values():
-                        uuid_ = uuid.uuid4()
-                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
-                                                  digest.hash, str(digest.size_bytes)])
-
-                        def request_stream(resname):
-                            with open(self.objpath(digest), 'rb') as f:
-                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
-                                offset = 0
-                                finished = False
-                                remaining = digest.size_bytes
-                                while not finished:
-                                    chunk_size = min(remaining, 64 * 1024)
-                                    remaining -= chunk_size
-
-                                    request = bytestream_pb2.WriteRequest()
-                                    request.write_offset = offset
-                                    # max. 64 kB chunks
-                                    request.data = f.read(chunk_size)
-                                    request.resource_name = resname
-                                    request.finish_write = remaining <= 0
-                                    yield request
-                                    offset += chunk_size
-                                    finished = request.finish_write
-                        response = remote.bytestream.Write(request_stream(resource_name))
-
-                    request = buildstream_pb2.UpdateReferenceRequest()
-                    request.keys.append(ref)
-                    request.digest.hash = tree.hash
-                    request.digest.size_bytes = tree.size_bytes
-                    remote.ref_storage.UpdateReference(request)
-
-                    pushed = True
-
-            except grpc.RpcError as e:
-                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
-                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
-
-            if skipped_remote:
+            if self._push_refs_to_remote(refs, remote):
+                pushed = True
+            else:
                 self.context.message(Message(
                     None,
                     MessageType.SKIPPED,
                     "Remote ({}) already has {} cached".format(
                         remote.spec.url, element._get_brief_display_key())
                 ))
+
+        return pushed
+
+    def push_directory(self, project, directory):
+
+        push_remotes = [r for r in self._remotes[project] if r.spec.push]
+
+        if directory.ref is None:
+            return None
+
+        for remote in push_remotes:
+            remote.init()
+
+            self._send_directory(remote, directory.ref)
+
+        return directory.ref
+
+    def push_message(self, project, message):
+
+        push_remotes = [r for r in self._remotes[project] if r.spec.push]
+
+        message_buffer = message.SerializeToString()
+        message_sha = hashlib.sha256(message_buffer)
+        message_digest = remote_execution_pb2.Digest()
+        message_digest.hash = message_sha.hexdigest()
+        message_digest.size_bytes = len(message_buffer)
+
+        for remote in push_remotes:
+            remote.init()
+
+            with io.BytesIO(message_buffer) as b:
+                self._send_blob(remote, message_digest, b)
+
+        return message_digest
+
+    def _verify_digest_on_remote(self, remote, digest):
+        # Check whether ref is already on the server in which case
+        # there is no need to push the artifact
+        request = remote_execution_pb2.FindMissingBlobsRequest()
+        request.blob_digests.extend([digest])
+
+        response = remote.cas.FindMissingBlobs(request)
+        if digest in response.missing_blob_digests:
+            return False
+
+        return True
+
+    def verify_digest_pushed(self, project, digest):
+
+        push_remotes = [r for r in self._remotes[project] if r.spec.push]
+
+        pushed = False
+
+        for remote in push_remotes:
+            remote.init()
+
+            if self._verify_digest_on_remote(remote, digest):
+                pushed = True
+
         return pushed
 
     ################################################
@@ -599,6 +641,7 @@
     ################################################
     #             Local Private Methods            #
     ################################################
+
     def _checkout(self, dest, tree):
         os.makedirs(dest, exist_ok=True)
 
@@ -761,16 +804,16 @@
             #
             q.put(str(e))
 
-    def _required_blobs(self, tree):
+    def _required_blobs(self, directory_digest):
         # parse directory, and recursively add blobs
         d = remote_execution_pb2.Digest()
-        d.hash = tree.hash
-        d.size_bytes = tree.size_bytes
+        d.hash = directory_digest.hash
+        d.size_bytes = directory_digest.size_bytes
         yield d
 
         directory = remote_execution_pb2.Directory()
 
-        with open(self.objpath(tree), 'rb') as f:
+        with open(self.objpath(directory_digest), 'rb') as f:
             directory.ParseFromString(f.read())
 
         for filenode in directory.files:
@@ -782,16 +825,16 @@
         for dirnode in directory.directories:
             yield from self._required_blobs(dirnode.digest)
 
-    def _fetch_blob(self, remote, digest, out):
+    def _fetch_blob(self, remote, digest, stream):
         resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
         request = bytestream_pb2.ReadRequest()
         request.resource_name = resource_name
         request.read_offset = 0
         for response in remote.bytestream.Read(request):
-            out.write(response.data)
+            stream.write(response.data)
+        stream.flush()
 
-        out.flush()
-        assert digest.size_bytes == os.fstat(out.fileno()).st_size
+        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
 
     def _fetch_directory(self, remote, tree):
         objpath = self.objpath(tree)
@@ -827,6 +870,92 @@
             digest = self.add_object(path=out.name)
             assert digest.hash == tree.hash
 
+    def _fetch_tree(self, remote, digest):
+        # download but do not store the Tree object
+        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
+            self._fetch_blob(remote, digest, out)
+
+            tree = remote_execution_pb2.Tree()
+
+            with open(out.name, 'rb') as f:
+                tree.ParseFromString(f.read())
+
+            tree.children.extend([tree.root])
+            for directory in tree.children:
+                for filenode in directory.files:
+                    fileobjpath = self.objpath(filenode.digest)
+                    if os.path.exists(fileobjpath):
+                        # already in local cache
+                        continue
+
+                    with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
+                        self._fetch_blob(remote, filenode.digest, f)
+
+                        added_digest = self.add_object(path=f.name)
+                        assert added_digest.hash == filenode.digest.hash
+
+                # place directory blob only in final location when we've downloaded
+                # all referenced blobs to avoid dangling references in the repository
+                dirbuffer = directory.SerializeToString()
+                dirdigest = self.add_object(buffer=dirbuffer)
+                assert dirdigest.size_bytes == len(dirbuffer)
+
+        return dirdigest
+
+    def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
+        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
+                                  digest.hash, str(digest.size_bytes)])
+
+        def request_stream(resname, instream):
+            offset = 0
+            finished = False
+            remaining = digest.size_bytes
+            while not finished:
+                chunk_size = min(remaining, 64 * 1024)
+                remaining -= chunk_size
+
+                request = bytestream_pb2.WriteRequest()
+                request.write_offset = offset
+                # max. 64 kB chunks
+                request.data = instream.read(chunk_size)
+                request.resource_name = resname
+                request.finish_write = remaining <= 0
+
+                yield request
+
+                offset += chunk_size
+                finished = request.finish_write
+
+        response = remote.bytestream.Write(request_stream(resource_name, stream))
+
+        assert response.committed_size == digest.size_bytes
+
+    def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
+        required_blobs = self._required_blobs(digest)
+
+        missing_blobs = dict()
+        # Limit size of FindMissingBlobs request
+        for required_blobs_group in _grouper(required_blobs, 512):
+            request = remote_execution_pb2.FindMissingBlobsRequest()
+
+            for required_digest in required_blobs_group:
+                d = request.blob_digests.add()
+                d.hash = required_digest.hash
+                d.size_bytes = required_digest.size_bytes
+
+            response = remote.cas.FindMissingBlobs(request)
+            for missing_digest in response.missing_blob_digests:
+                d = remote_execution_pb2.Digest()
+                d.hash = missing_digest.hash
+                d.size_bytes = missing_digest.size_bytes
+                missing_blobs[d.hash] = d
+
+        # Upload any blobs missing on the server
+        for blob_digest in missing_blobs.values():
+            with open(self.objpath(blob_digest), 'rb') as f:
+                assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
+                self._send_blob(remote, blob_digest, f, u_uid=u_uid)
+
 
 # Represents a single remote CAS cache.
 #
diff --git a/buildstream/_project.py b/buildstream/_project.py
index 2f8ae69..f0ca3d7 100644
--- a/buildstream/_project.py
+++ b/buildstream/_project.py
@@ -128,6 +128,7 @@
         self._shell_host_files = []   # A list of HostMount objects
 
         self.artifact_cache_specs = None
+        self.remote_execution_url = None
         self._sandbox = None
         self._splits = None
 
@@ -471,7 +472,7 @@
             'aliases', 'name',
             'artifacts', 'options',
             'fail-on-overlap', 'shell', 'fatal-warnings',
-            'ref-storage', 'sandbox', 'mirrors'
+            'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
         ])
 
         #
@@ -482,6 +483,11 @@
         # Load artifacts pull/push configuration for this project
         self.artifact_cache_specs = ArtifactCache.specs_from_config_node(config, self.directory)
 
+        # Load remote-execution configuration for this project
+        remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
+        _yaml.node_validate(remote_execution, ['url'])
+        self.remote_execution_url = _yaml.node_get(remote_execution, str, 'url')
+
         # Load sandbox environment variables
         self.base_environment = _yaml.node_get(config, Mapping, 'environment')
         self.base_env_nocache = _yaml.node_get(config, list, 'environment-nocache')
diff --git a/buildstream/buildelement.py b/buildstream/buildelement.py
index 180bb86..5447c13 100644
--- a/buildstream/buildelement.py
+++ b/buildstream/buildelement.py
@@ -155,6 +155,9 @@
             command_dir = build_root
         sandbox.set_work_directory(command_dir)
 
+        # Tell sandbox which directory is preserved in the finished artifact
+        sandbox.set_output_directory(install_root)
+
         # Setup environment
         sandbox.set_environment(self.get_environment())
 
diff --git a/buildstream/data/projectconfig.yaml b/buildstream/data/projectconfig.yaml
index c1ad2d1..1da67a5 100644
--- a/buildstream/data/projectconfig.yaml
+++ b/buildstream/data/projectconfig.yaml
@@ -204,3 +204,6 @@
   # Command to run when `bst shell` does not provide a command
   #
   command: [ 'sh', '-i' ]
+
+remote-execution:
+  url: ""
\ No newline at end of file
diff --git a/buildstream/element.py b/buildstream/element.py
index ae8e101..dd7ccfe 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -95,6 +95,7 @@
 from ._platform import Platform
 from .plugin import CoreWarnings
 from .sandbox._config import SandboxConfig
+from .sandbox._sandboxremote import SandboxRemote
 
 from .storage.directory import Directory
 from .storage._filebaseddirectory import FileBasedDirectory
@@ -250,6 +251,12 @@
         # Extract Sandbox config
         self.__sandbox_config = self.__extract_sandbox_config(meta)
 
+        # Extract remote execution URL
+        if not self.__is_junction:
+            self.__remote_execution_url = project.remote_execution_url
+        else:
+            self.__remote_execution_url = None
+
     def __lt__(self, other):
         return self.name < other.name
 
@@ -1570,6 +1577,8 @@
                 finally:
                     if collect is not None:
                         try:
+                            # Sandbox will probably have replaced its virtual directory, so get it again
+                            sandbox_vroot = sandbox.get_virtual_directory()
                             collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
                         except VirtualDirectoryError:
                             # No collect directory existed
@@ -2146,7 +2155,32 @@
         project = self._get_project()
         platform = Platform.get_platform()
 
-        if directory is not None and os.path.exists(directory):
+        if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
+            if not self.__artifacts.has_push_remotes(element=self):
+                # Give an early warning if remote execution will not work
+                raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
+                                   .format(self.name) +
+                                   "The remote artifact server(s) may not be correctly configured or contactable.")
+
+            self.info("Using a remote sandbox for artifact {}".format(self.name))
+
+            sandbox = SandboxRemote(context, project,
+                                    directory,
+                                    stdout=stdout,
+                                    stderr=stderr,
+                                    config=config,
+                                    server_url=self.__remote_execution_url,
+                                    allow_real_directory=False)
+            yield sandbox
+
+        elif directory is not None and os.path.exists(directory):
+            if self.__remote_execution_url:
+                self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
+                          .format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
+                          .format(kind=self.get_kind()), warning_token="remote-failure")
+
+                self.info("Falling back to local sandbox for artifact {}".format(self.name))
+
             sandbox = platform.create_sandbox(context, project,
                                               directory,
                                               stdout=stdout,
diff --git a/buildstream/plugins/elements/autotools.py b/buildstream/plugins/elements/autotools.py
index 14d04d9..cf5e856 100644
--- a/buildstream/plugins/elements/autotools.py
+++ b/buildstream/plugins/elements/autotools.py
@@ -57,7 +57,8 @@
 
 # Element implementation for the 'autotools' kind.
 class AutotoolsElement(BuildElement):
-    pass
+    # Supports virtual directories (required for remote execution)
+    BST_VIRTUAL_DIRECTORY = True
 
 
 # Plugin entry point
diff --git a/buildstream/plugins/elements/cmake.py b/buildstream/plugins/elements/cmake.py
index 8126a80..2cb2601 100644
--- a/buildstream/plugins/elements/cmake.py
+++ b/buildstream/plugins/elements/cmake.py
@@ -56,7 +56,8 @@
 
 # Element implementation for the 'cmake' kind.
 class CMakeElement(BuildElement):
-    pass
+    # Supports virtual directories (required for remote execution)
+    BST_VIRTUAL_DIRECTORY = True
 
 
 # Plugin entry point
diff --git a/buildstream/plugins/elements/make.py b/buildstream/plugins/elements/make.py
index 1f37cb4..6c500f3 100644
--- a/buildstream/plugins/elements/make.py
+++ b/buildstream/plugins/elements/make.py
@@ -38,7 +38,8 @@
 
 # Element implementation for the 'make' kind.
 class MakeElement(BuildElement):
-    pass
+    # Supports virtual directories (required for remote execution)
+    BST_VIRTUAL_DIRECTORY = True
 
 
 # Plugin entry point
diff --git a/buildstream/plugins/elements/meson.py b/buildstream/plugins/elements/meson.py
index 228e90a..9e0edf1 100644
--- a/buildstream/plugins/elements/meson.py
+++ b/buildstream/plugins/elements/meson.py
@@ -53,7 +53,8 @@
 
 # Element implementation for the 'meson' kind.
 class MesonElement(BuildElement):
-    pass
+    # Supports virtual directories (required for remote execution)
+    BST_VIRTUAL_DIRECTORY = True
 
 
 # Plugin entry point
diff --git a/buildstream/plugins/elements/qmake.py b/buildstream/plugins/elements/qmake.py
index 7896692..9f5bc40 100644
--- a/buildstream/plugins/elements/qmake.py
+++ b/buildstream/plugins/elements/qmake.py
@@ -33,7 +33,8 @@
 
 # Element implementation for the 'qmake' kind.
 class QMakeElement(BuildElement):
-    pass
+    # Supports virtual directories (required for remote execution)
+    BST_VIRTUAL_DIRECTORY = True
 
 
 # Plugin entry point
diff --git a/buildstream/sandbox/__init__.py b/buildstream/sandbox/__init__.py
index 53e170f..2c76e9e 100644
--- a/buildstream/sandbox/__init__.py
+++ b/buildstream/sandbox/__init__.py
@@ -20,3 +20,4 @@
 from .sandbox import Sandbox, SandboxFlags
 from ._sandboxchroot import SandboxChroot
 from ._sandboxbwrap import SandboxBwrap
+from ._sandboxremote import SandboxRemote
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
new file mode 100644
index 0000000..296b203
--- /dev/null
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -0,0 +1,226 @@
+#!/usr/bin/env python3
+#
+#  Copyright (C) 2018 Bloomberg LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Authors:
+#        Jim MacArthur <jim.macarthur@codethink.co.uk>
+
+import os
+from urllib.parse import urlparse
+
+import grpc
+
+from . import Sandbox
+from ..storage._filebaseddirectory import FileBasedDirectory
+from ..storage._casbaseddirectory import CasBasedDirectory
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._artifactcache.cascache import CASCache
+
+
+class SandboxError(Exception):
+    pass
+
+
+# SandboxRemote()
+#
+# This isn't really a sandbox, it's a stub which sends all the sources and build
+# commands to a remote server and retrieves the results from it.
+#
+class SandboxRemote(Sandbox):
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.cascache = None
+
+        url = urlparse(kwargs['server_url'])
+        if not url.scheme or not url.hostname or not url.port:
+            raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
+                               .format(kwargs['server_url']) +
+                               "It should be of the form <protocol>://<domain name>:<port>.")
+        elif url.scheme != 'http':
+            raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
+                               "Only plain HTTP is currenlty supported (no HTTPS).")
+
+        self.server_url = '{}:{}'.format(url.hostname, url.port)
+
+    def _get_cascache(self):
+        if self.cascache is None:
+            self.cascache = CASCache(self._get_context())
+            self.cascache.setup_remotes(use_config=True)
+        return self.cascache
+
+    def run_remote_command(self, command, input_root_digest, working_directory, environment):
+        # Sends an execution request to the remote execution server.
+        #
+        # This function blocks until it gets a response from the server.
+        #
+        environment_variables = [remote_execution_pb2.Command.
+                                 EnvironmentVariable(name=k, value=v)
+                                 for (k, v) in environment.items()]
+
+        # Create and send the Command object.
+        remote_command = remote_execution_pb2.Command(arguments=command,
+                                                      working_directory=working_directory,
+                                                      environment_variables=environment_variables,
+                                                      output_files=[],
+                                                      output_directories=[self._output_directory],
+                                                      platform=None)
+
+        cascache = self._get_cascache()
+        # Upload the Command message to the remote CAS server
+        command_digest = cascache.push_message(self._get_project(), remote_command)
+        if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
+            # Command push failed
+            return None
+
+        # Create and send the action.
+        action = remote_execution_pb2.Action(command_digest=command_digest,
+                                             input_root_digest=input_root_digest,
+                                             timeout=None,
+                                             do_not_cache=False)
+
+        # Upload the Action message to the remote CAS server
+        action_digest = cascache.push_message(self._get_project(), action)
+        if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest):
+            # Action push failed
+            return None
+
+        # Next, try to create a communication channel to the BuildGrid server.
+        channel = grpc.insecure_channel(self.server_url)
+        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
+        request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
+                                                      skip_cache_lookup=False)
+        try:
+            operation_iterator = stub.Execute(request)
+        except grpc.RpcError:
+            return None
+
+        operation = None
+        with self._get_context().timed_activity("Waiting for the remote build to complete"):
+            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
+            # which will check the server is actually contactable. However, calling it when the
+            # server is available seems to cause .code() to hang forever.
+            for operation in operation_iterator:
+                if operation.done:
+                    break
+
+        return operation
+
+    def process_job_output(self, output_directories, output_files):
+        # Reads the remote execution server response to an execution request.
+        #
+        # output_directories is an array of OutputDirectory objects.
+        # output_files is an array of OutputFile objects.
+        #
+        # We only specify one output_directory, so it's an error
+        # for there to be any output files or more than one directory at the moment.
+        #
+        if output_files:
+            raise SandboxError("Output files were returned when we didn't request any.")
+        elif not output_directories:
+            error_text = "No output directory was returned from the build server."
+            raise SandboxError(error_text)
+        elif len(output_directories) > 1:
+            error_text = "More than one output directory was returned from the build server: {}."
+            raise SandboxError(error_text.format(output_directories))
+
+        tree_digest = output_directories[0].tree_digest
+        if tree_digest is None or not tree_digest.hash:
+            raise SandboxError("Output directory structure had no digest attached.")
+
+        cascache = self._get_cascache()
+        # Now do a pull to ensure we have the necessary parts.
+        dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
+        if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
+            raise SandboxError("Output directory structure pulling from remote failed.")
+
+        path_components = os.path.split(self._output_directory)
+
+        # Now what we have is a digest for the output. Once we return, the calling process will
+        # attempt to descend into our directory and find that directory, so we need to overwrite
+        # that.
+
+        if not path_components:
+            # The artifact wants the whole directory; we could just return the returned hash in its
+            # place, but we don't have a means to do that yet.
+            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
+
+        # At the moment, we will get the whole directory back in the first directory argument and we need
+        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
+        # from another hash will be interesting, though...
+
+        new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
+        self._set_virtual_directory(new_dir)
+
+    def run(self, command, flags, *, cwd=None, env=None):
+        # Upload sources
+        upload_vdir = self.get_virtual_directory()
+
+        if isinstance(upload_vdir, FileBasedDirectory):
+            # Make a new temporary directory to put source in
+            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
+            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
+
+        upload_vdir.recalculate_hash()
+
+        cascache = self._get_cascache()
+        # Now, push that key (without necessarily needing a ref) to the remote.
+        vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
+        if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
+            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
+
+        # Set up environment and working directory
+        if cwd is None:
+            cwd = self._get_work_directory()
+
+        if cwd is None:
+            cwd = '/'
+
+        if env is None:
+            env = self._get_environment()
+
+        # We want command args as a list of strings
+        if isinstance(command, str):
+            command = [command]
+
+        # Now transmit the command to execute
+        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
+
+        if operation is None:
+            # Failure of remote execution, usually due to an error in BuildStream
+            # NB This error could be raised in __run_remote_command
+            raise SandboxError("No response returned from server")
+
+        assert not operation.HasField('error') and operation.HasField('response')
+
+        execution_response = remote_execution_pb2.ExecuteResponse()
+        # The response is expected to be an ExecutionResponse message
+        assert operation.response.Is(execution_response.DESCRIPTOR)
+
+        operation.response.Unpack(execution_response)
+
+        if execution_response.status.code != 0:
+            # A normal error during the build: the remote execution system
+            # has worked correctly but the command failed.
+            # execution_response.error also contains 'message' (str) and
+            # 'details' (iterator of Any) which we ignore at the moment.
+            return execution_response.status.code
+
+        action_result = execution_response.result
+
+        self.process_job_output(action_result.output_directories, action_result.output_files)
+
+        return 0
diff --git a/buildstream/sandbox/sandbox.py b/buildstream/sandbox/sandbox.py
index 87a2fb9..9d34f01 100644
--- a/buildstream/sandbox/sandbox.py
+++ b/buildstream/sandbox/sandbox.py
@@ -99,9 +99,11 @@
         self.__stdout = kwargs['stdout']
         self.__stderr = kwargs['stderr']
 
-        # Setup the directories. Root should be available to subclasses, hence
-        # being single-underscore. The others are private to this class.
+        # Setup the directories. Root and output_directory should be
+        # available to subclasses, hence being single-underscore. The
+        # others are private to this class.
         self._root = os.path.join(directory, 'root')
+        self._output_directory = None
         self.__directory = directory
         self.__scratch = os.path.join(self.__directory, 'scratch')
         for directory_ in [self._root, self.__scratch]:
@@ -144,11 +146,17 @@
                 self._vdir = FileBasedDirectory(self._root)
         return self._vdir
 
+    def _set_virtual_directory(self, virtual_directory):
+        """ Sets virtual directory. Useful after remote execution
+        has rewritten the working directory.
+        """
+        self._vdir = virtual_directory
+
     def set_environment(self, environment):
         """Sets the environment variables for the sandbox
 
         Args:
-           directory (dict): The environment variables to use in the sandbox
+           environment (dict): The environment variables to use in the sandbox
         """
         self.__env = environment
 
@@ -160,6 +168,15 @@
         """
         self.__cwd = directory
 
+    def set_output_directory(self, directory):
+        """Sets the output directory - the directory which is preserved
+        as an artifact after assembly.
+
+        Args:
+           directory (str): An absolute path within the sandbox
+        """
+        self._output_directory = directory
+
     def mark_directory(self, directory, *, artifact=False):
         """Marks a sandbox directory and ensures it will exist
 
diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py
index 5ca1007..d580635 100644
--- a/buildstream/storage/_casbaseddirectory.py
+++ b/buildstream/storage/_casbaseddirectory.py
@@ -543,6 +543,15 @@
                 filelist.append(k)
         return filelist
 
+    def recalculate_hash(self):
+        """ Recalcuates the hash for this directory and store the results in
+        the cache. If this directory has a parent, tell it to
+        recalculate (since changing this directory changes an entry in
+        the parent). Hashes for subdirectories also get recalculated.
+        """
+        self._recalculate_recursing_up()
+        self._recalculate_recursing_down()
+
     def _get_identifier(self):
         path = ""
         if self.parent:
diff --git a/doc/source/format_project.rst b/doc/source/format_project.rst
index b43e670..3198414 100644
--- a/doc/source/format_project.rst
+++ b/doc/source/format_project.rst
@@ -204,6 +204,24 @@
 You can also specify a list of caches here; earlier entries in the list
 will have higher priority than later ones.
 
+Remote execution
+~~~~~~~~~~~~~~~~
+BuildStream supports remote execution using the Google Remote Execution API
+(REAPI). A description of how remote execution works is beyond the scope
+of this document, but you can specify a remote server complying with the REAPI
+using the `remote-execution` option:
+
+.. code:: yaml
+
+  remote-execution:
+
+    # A url defining a remote execution server
+    url: http://buildserver.example.com:50051
+
+The url should contain a hostname and port separated by ':'. Only plain HTTP is
+currently suported (no HTTPS).
+
+The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
 
 .. _project_essentials_mirrors:
 
diff --git a/tests/artifactcache/project/elements/compose-all.bst b/tests/artifactcache/project/elements/compose-all.bst
new file mode 100644
index 0000000..ba47081
--- /dev/null
+++ b/tests/artifactcache/project/elements/compose-all.bst
@@ -0,0 +1,12 @@
+kind: compose
+
+depends:
+- filename: import-bin.bst
+  type: build
+- filename: import-dev.bst
+  type: build
+
+config:
+  # Dont try running the sandbox, we dont have a
+  # runtime to run anything in this context.
+  integrate: False
diff --git a/tests/artifactcache/project/elements/import-bin.bst b/tests/artifactcache/project/elements/import-bin.bst
new file mode 100644
index 0000000..a847c0c
--- /dev/null
+++ b/tests/artifactcache/project/elements/import-bin.bst
@@ -0,0 +1,4 @@
+kind: import
+sources:
+- kind: local
+  path: files/bin-files
diff --git a/tests/artifactcache/project/elements/import-dev.bst b/tests/artifactcache/project/elements/import-dev.bst
new file mode 100644
index 0000000..152a546
--- /dev/null
+++ b/tests/artifactcache/project/elements/import-dev.bst
@@ -0,0 +1,4 @@
+kind: import
+sources:
+- kind: local
+  path: files/dev-files
diff --git a/tests/artifactcache/project/elements/target.bst b/tests/artifactcache/project/elements/target.bst
new file mode 100644
index 0000000..ba489f1
--- /dev/null
+++ b/tests/artifactcache/project/elements/target.bst
@@ -0,0 +1,9 @@
+kind: stack
+description: |
+
+  Main stack target for the bst build test
+
+depends:
+- import-bin.bst
+- import-dev.bst
+- compose-all.bst
diff --git a/tests/artifactcache/project/files/bin-files/usr/bin/hello b/tests/artifactcache/project/files/bin-files/usr/bin/hello
new file mode 100755
index 0000000..f534a40
--- /dev/null
+++ b/tests/artifactcache/project/files/bin-files/usr/bin/hello
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+echo "Hello !"
diff --git a/tests/artifactcache/project/files/dev-files/usr/include/pony.h b/tests/artifactcache/project/files/dev-files/usr/include/pony.h
new file mode 100644
index 0000000..40bd0c2
--- /dev/null
+++ b/tests/artifactcache/project/files/dev-files/usr/include/pony.h
@@ -0,0 +1,12 @@
+#ifndef __PONY_H__
+#define __PONY_H__
+
+#define PONY_BEGIN "Once upon a time, there was a pony."
+#define PONY_END "And they lived happily ever after, the end."
+
+#define MAKE_PONY(story)  \
+  PONY_BEGIN \
+  story \
+  PONY_END
+
+#endif /* __PONY_H__ */
diff --git a/tests/artifactcache/project/project.conf b/tests/artifactcache/project/project.conf
new file mode 100644
index 0000000..854e386
--- /dev/null
+++ b/tests/artifactcache/project/project.conf
@@ -0,0 +1,4 @@
+# Project config for frontend build test
+name: test
+
+element-path: elements
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
new file mode 100644
index 0000000..6336e7a
--- /dev/null
+++ b/tests/artifactcache/pull.py
@@ -0,0 +1,320 @@
+import hashlib
+import multiprocessing
+import os
+import signal
+
+import pytest
+
+from buildstream import _yaml, _signals, utils
+from buildstream._artifactcache.cascache import CASCache
+from buildstream._context import Context
+from buildstream._project import Project
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+
+from tests.testutils import cli, create_artifact_share
+
+
+# Project directory
+DATA_DIR = os.path.join(
+    os.path.dirname(os.path.realpath(__file__)),
+    "project",
+)
+
+
+# Handle messages from the pipeline
+def message_handler(message, context):
+    pass
+
+
+def tree_maker(cas, tree, directory):
+    if tree.root.ByteSize() == 0:
+        tree.root.CopyFrom(directory)
+
+    for directory_node in directory.directories:
+        child_directory = tree.children.add()
+
+        with open(cas.objpath(directory_node.digest), 'rb') as f:
+            child_directory.ParseFromString(f.read())
+
+        tree_maker(cas, tree, child_directory)
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_pull(cli, tmpdir, datafiles):
+    project_dir = str(datafiles)
+
+    # Set up an artifact cache.
+    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+        # Configure artifact share
+        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        user_config_file = str(tmpdir.join('buildstream.conf'))
+        user_config = {
+            'scheduler': {
+                'pushers': 1
+            },
+            'artifacts': {
+                'url': share.repo,
+                'push': True,
+            }
+        }
+
+        # Write down the user configuration file
+        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+        # Ensure CLI calls will use it
+        cli.configure(user_config)
+
+        # First build the project with the artifact cache configured
+        result = cli.run(project=project_dir, args=['build', 'target.bst'])
+        result.assert_success()
+
+        # Assert that we are now cached locally
+        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+        # Assert that we shared/pushed the cached artifact
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert share.has_artifact('test', 'target.bst', element_key)
+
+        # Delete the artifact locally
+        cli.remove_artifact_from_cache(project_dir, 'target.bst')
+
+        # Assert that we are not cached locally anymore
+        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
+
+        # Fake minimal context
+        context = Context()
+        context.load(config=user_config_file)
+        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        context.set_message_handler(message_handler)
+
+        # Load the project and CAS cache
+        project = Project(project_dir, context)
+        project.ensure_fully_loaded()
+        cas = CASCache(context)
+
+        # Assert that the element's artifact is **not** cached
+        element = project.load_elements(['target.bst'], cas)[0]
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert not cas.contains(element, element_key)
+
+        queue = multiprocessing.Queue()
+        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+        # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+        process = multiprocessing.Process(target=_test_pull,
+                                          args=(user_config_file, project_dir, artifact_dir,
+                                                'target.bst', element_key, queue))
+
+        try:
+            # Keep SIGINT blocked in the child process
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                process.start()
+
+            error = queue.get()
+            process.join()
+        except KeyboardInterrupt:
+            utils._kill_process_tree(process.pid)
+            raise
+
+        assert not error
+        assert cas.contains(element, element_key)
+
+
+def _test_pull(user_config_file, project_dir, artifact_dir,
+               element_name, element_key, queue):
+    # Fake minimal context
+    context = Context()
+    context.load(config=user_config_file)
+    context.artifactdir = artifact_dir
+    context.set_message_handler(message_handler)
+
+    # Load the project manually
+    project = Project(project_dir, context)
+    project.ensure_fully_loaded()
+
+    # Create a local CAS cache handle
+    cas = CASCache(context)
+
+    # Load the target element
+    element = project.load_elements([element_name], cas)[0]
+
+    # Manually setup the CAS remote
+    cas.setup_remotes(use_config=True)
+    cas.initialize_remotes()
+
+    if cas.has_push_remotes(element=element):
+        # Push the element's artifact
+        if not cas.pull(element, element_key):
+            queue.put("Pull operation failed")
+        else:
+            queue.put(None)
+    else:
+        queue.put("No remote configured for element {}".format(element_name))
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_pull_tree(cli, tmpdir, datafiles):
+    project_dir = str(datafiles)
+
+    # Set up an artifact cache.
+    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+        # Configure artifact share
+        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        user_config_file = str(tmpdir.join('buildstream.conf'))
+        user_config = {
+            'scheduler': {
+                'pushers': 1
+            },
+            'artifacts': {
+                'url': share.repo,
+                'push': True,
+            }
+        }
+
+        # Write down the user configuration file
+        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+        # Ensure CLI calls will use it
+        cli.configure(user_config)
+
+        # First build the project with the artifact cache configured
+        result = cli.run(project=project_dir, args=['build', 'target.bst'])
+        result.assert_success()
+
+        # Assert that we are now cached locally
+        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+        # Assert that we shared/pushed the cached artifact
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert share.has_artifact('test', 'target.bst', element_key)
+
+        # Fake minimal context
+        context = Context()
+        context.load(config=user_config_file)
+        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        context.set_message_handler(message_handler)
+
+        # Load the project and CAS cache
+        project = Project(project_dir, context)
+        project.ensure_fully_loaded()
+        cas = CASCache(context)
+
+        # Assert that the element's artifact is cached
+        element = project.load_elements(['target.bst'], cas)[0]
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert cas.contains(element, element_key)
+
+        # Retrieve the Directory object from the cached artifact
+        artifact_ref = cas.get_artifact_fullname(element, element_key)
+        artifact_digest = cas.resolve_ref(artifact_ref)
+
+        queue = multiprocessing.Queue()
+        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+        # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+        process = multiprocessing.Process(target=_test_push_tree,
+                                          args=(user_config_file, project_dir, artifact_dir,
+                                                artifact_digest, queue))
+
+        try:
+            # Keep SIGINT blocked in the child process
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                process.start()
+
+            tree_hash, tree_size = queue.get()
+            process.join()
+        except KeyboardInterrupt:
+            utils._kill_process_tree(process.pid)
+            raise
+
+        assert tree_hash and tree_size
+
+        # Now delete the artifact locally
+        cli.remove_artifact_from_cache(project_dir, 'target.bst')
+
+        # Assert that we are not cached locally anymore
+        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
+
+        tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
+                                                  size_bytes=tree_size)
+
+        queue = multiprocessing.Queue()
+        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+        process = multiprocessing.Process(target=_test_pull_tree,
+                                          args=(user_config_file, project_dir, artifact_dir,
+                                                tree_digest, queue))
+
+        try:
+            # Keep SIGINT blocked in the child process
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                process.start()
+
+            directory_hash, directory_size = queue.get()
+            process.join()
+        except KeyboardInterrupt:
+            utils._kill_process_tree(process.pid)
+            raise
+
+        assert directory_hash and directory_size
+
+        directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
+                                                       size_bytes=directory_size)
+
+        # Ensure the entire Tree stucture has been pulled
+        assert os.path.exists(cas.objpath(directory_digest))
+
+
+def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
+    # Fake minimal context
+    context = Context()
+    context.load(config=user_config_file)
+    context.artifactdir = artifact_dir
+    context.set_message_handler(message_handler)
+
+    # Load the project manually
+    project = Project(project_dir, context)
+    project.ensure_fully_loaded()
+
+    # Create a local CAS cache handle
+    cas = CASCache(context)
+
+    # Manually setup the CAS remote
+    cas.setup_remotes(use_config=True)
+    cas.initialize_remotes()
+
+    if cas.has_push_remotes():
+        directory = remote_execution_pb2.Directory()
+
+        with open(cas.objpath(artifact_digest), 'rb') as f:
+            directory.ParseFromString(f.read())
+
+        # Build the Tree object while we are still cached
+        tree = remote_execution_pb2.Tree()
+        tree_maker(cas, tree, directory)
+
+        # Push the Tree as a regular message
+        tree_digest = cas.push_message(project, tree)
+
+        queue.put((tree_digest.hash, tree_digest.size_bytes))
+    else:
+        queue.put("No remote configured")
+
+
+def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
+    # Fake minimal context
+    context = Context()
+    context.load(config=user_config_file)
+    context.artifactdir = artifact_dir
+    context.set_message_handler(message_handler)
+
+    # Load the project manually
+    project = Project(project_dir, context)
+    project.ensure_fully_loaded()
+
+    # Create a local CAS cache handle
+    cas = CASCache(context)
+
+    # Manually setup the CAS remote
+    cas.setup_remotes(use_config=True)
+    cas.initialize_remotes()
+
+    if cas.has_push_remotes():
+        # Pull the artifact using the Tree object
+        directory_digest = cas.pull_tree(project, artifact_digest)
+        queue.put((directory_digest.hash, directory_digest.size_bytes))
+    else:
+        queue.put("No remote configured")
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
new file mode 100644
index 0000000..bdeb868
--- /dev/null
+++ b/tests/artifactcache/push.py
@@ -0,0 +1,312 @@
+import multiprocessing
+import os
+import signal
+
+import pytest
+
+from pluginbase import PluginBase
+from buildstream import _yaml, _signals, utils
+from buildstream._artifactcache.cascache import CASCache
+from buildstream._context import Context
+from buildstream._project import Project
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from buildstream.storage._casbaseddirectory import CasBasedDirectory
+
+from tests.testutils import cli, create_artifact_share
+
+
+# Project directory
+DATA_DIR = os.path.join(
+    os.path.dirname(os.path.realpath(__file__)),
+    "project",
+)
+
+
+# Handle messages from the pipeline
+def message_handler(message, context):
+    pass
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_push(cli, tmpdir, datafiles):
+    project_dir = str(datafiles)
+
+    # First build the project without the artifact cache configured
+    result = cli.run(project=project_dir, args=['build', 'target.bst'])
+    result.assert_success()
+
+    # Assert that we are now cached locally
+    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+
+    # Set up an artifact cache.
+    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+        # Configure artifact share
+        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        user_config_file = str(tmpdir.join('buildstream.conf'))
+        user_config = {
+            'scheduler': {
+                'pushers': 1
+            },
+            'artifacts': {
+                'url': share.repo,
+                'push': True,
+            }
+        }
+
+        # Write down the user configuration file
+        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+
+        # Fake minimal context
+        context = Context()
+        context.load(config=user_config_file)
+        context.artifactdir = artifact_dir
+        context.set_message_handler(message_handler)
+
+        # Load the project manually
+        project = Project(project_dir, context)
+        project.ensure_fully_loaded()
+
+        # Create a local CAS cache handle
+        cas = CASCache(context)
+
+        # Assert that the element's artifact is cached
+        element = project.load_elements(['target.bst'], cas)[0]
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert cas.contains(element, element_key)
+
+        queue = multiprocessing.Queue()
+        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+        # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+        process = multiprocessing.Process(target=_test_push,
+                                          args=(user_config_file, project_dir, artifact_dir,
+                                                'target.bst', element_key, queue))
+
+        try:
+            # Keep SIGINT blocked in the child process
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                process.start()
+
+            error = queue.get()
+            process.join()
+        except KeyboardInterrupt:
+            utils._kill_process_tree(process.pid)
+            raise
+
+        assert not error
+        assert share.has_artifact('test', 'target.bst', element_key)
+
+
+def _test_push(user_config_file, project_dir, artifact_dir,
+               element_name, element_key, queue):
+    # Fake minimal context
+    context = Context()
+    context.load(config=user_config_file)
+    context.artifactdir = artifact_dir
+    context.set_message_handler(message_handler)
+
+    # Load the project manually
+    project = Project(project_dir, context)
+    project.ensure_fully_loaded()
+
+    # Create a local CAS cache handle
+    cas = CASCache(context)
+
+    # Load the target element
+    element = project.load_elements([element_name], cas)[0]
+
+    # Manually setup the CAS remote
+    cas.setup_remotes(use_config=True)
+    cas.initialize_remotes()
+
+    if cas.has_push_remotes(element=element):
+        # Push the element's artifact
+        if not cas.push(element, [element_key]):
+            queue.put("Push operation failed")
+        else:
+            queue.put(None)
+    else:
+        queue.put("No remote configured for element {}".format(element_name))
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_push_directory(cli, tmpdir, datafiles):
+    project_dir = str(datafiles)
+
+    # First build the project without the artifact cache configured
+    result = cli.run(project=project_dir, args=['build', 'target.bst'])
+    result.assert_success()
+
+    # Assert that we are now cached locally
+    assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+
+    # Set up an artifact cache.
+    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+        # Configure artifact share
+        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        user_config_file = str(tmpdir.join('buildstream.conf'))
+        user_config = {
+            'scheduler': {
+                'pushers': 1
+            },
+            'artifacts': {
+                'url': share.repo,
+                'push': True,
+            }
+        }
+
+        # Write down the user configuration file
+        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+
+        # Fake minimal context
+        context = Context()
+        context.load(config=user_config_file)
+        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        context.set_message_handler(message_handler)
+
+        # Load the project and CAS cache
+        project = Project(project_dir, context)
+        project.ensure_fully_loaded()
+        cas = CASCache(context)
+
+        # Assert that the element's artifact is cached
+        element = project.load_elements(['target.bst'], cas)[0]
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert cas.contains(element, element_key)
+
+        # Manually setup the CAS remote
+        cas.setup_remotes(use_config=True)
+        cas.initialize_remotes()
+        assert cas.has_push_remotes(element=element)
+
+        # Recreate the CasBasedDirectory object from the cached artifact
+        artifact_ref = cas.get_artifact_fullname(element, element_key)
+        artifact_digest = cas.resolve_ref(artifact_ref)
+
+        queue = multiprocessing.Queue()
+        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+        # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+        process = multiprocessing.Process(target=_test_push_directory,
+                                          args=(user_config_file, project_dir, artifact_dir,
+                                                artifact_digest, queue))
+
+        try:
+            # Keep SIGINT blocked in the child process
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                process.start()
+
+            directory_hash = queue.get()
+            process.join()
+        except KeyboardInterrupt:
+            utils._kill_process_tree(process.pid)
+            raise
+
+        assert directory_hash
+        assert artifact_digest.hash == directory_hash
+        assert share.has_object(artifact_digest)
+
+
+def _test_push_directory(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
+    # Fake minimal context
+    context = Context()
+    context.load(config=user_config_file)
+    context.artifactdir = artifact_dir
+    context.set_message_handler(message_handler)
+
+    # Load the project manually
+    project = Project(project_dir, context)
+    project.ensure_fully_loaded()
+
+    # Create a local CAS cache handle
+    cas = CASCache(context)
+
+    # Manually setup the CAS remote
+    cas.setup_remotes(use_config=True)
+    cas.initialize_remotes()
+
+    if cas.has_push_remotes():
+        # Create a CasBasedDirectory from local CAS cache content
+        directory = CasBasedDirectory(context, ref=artifact_digest)
+
+        # Push the CasBasedDirectory object
+        directory_digest = cas.push_directory(project, directory)
+
+        queue.put(directory_digest.hash)
+    else:
+        queue.put("No remote configured")
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_push_message(cli, tmpdir, datafiles):
+    project_dir = str(datafiles)
+
+    # Set up an artifact cache.
+    with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+        # Configure artifact share
+        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        user_config_file = str(tmpdir.join('buildstream.conf'))
+        user_config = {
+            'scheduler': {
+                'pushers': 1
+            },
+            'artifacts': {
+                'url': share.repo,
+                'push': True,
+            }
+        }
+
+        # Write down the user configuration file
+        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+
+        queue = multiprocessing.Queue()
+        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+        # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+        process = multiprocessing.Process(target=_test_push_message,
+                                          args=(user_config_file, project_dir, artifact_dir, queue))
+
+        try:
+            # Keep SIGINT blocked in the child process
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                process.start()
+
+            message_hash, message_size = queue.get()
+            process.join()
+        except KeyboardInterrupt:
+            utils._kill_process_tree(process.pid)
+            raise
+
+        assert message_hash and message_size
+        message_digest = remote_execution_pb2.Digest(hash=message_hash,
+                                                     size_bytes=message_size)
+        assert share.has_object(message_digest)
+
+
+def _test_push_message(user_config_file, project_dir, artifact_dir, queue):
+    # Fake minimal context
+    context = Context()
+    context.load(config=user_config_file)
+    context.artifactdir = artifact_dir
+    context.set_message_handler(message_handler)
+
+    # Load the project manually
+    project = Project(project_dir, context)
+    project.ensure_fully_loaded()
+
+    # Create a local CAS cache handle
+    cas = CASCache(context)
+
+    # Manually setup the CAS remote
+    cas.setup_remotes(use_config=True)
+    cas.initialize_remotes()
+
+    if cas.has_push_remotes():
+        # Create an example message object
+        command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
+                                               working_directory='/buildstream-build',
+                                               output_directories=['/buildstream-install'])
+
+        # Push the message object
+        command_digest = cas.push_message(project, command)
+
+        queue.put((command_digest.hash, command_digest.size_bytes))
+    else:
+        queue.put("No remote configured")
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 05e87a4..e3f709b 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -15,6 +15,7 @@
 from buildstream._artifactcache.casserver import create_server
 from buildstream._context import Context
 from buildstream._exceptions import ArtifactError
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 
 
 # ArtifactShare()
@@ -87,6 +88,23 @@
         # Sleep until termination by signal
         signal.pause()
 
+    # has_object():
+    #
+    # Checks whether the object is present in the share
+    #
+    # Args:
+    #    digest (str): The object's digest
+    #
+    # Returns:
+    #    (bool): True if the object exists in the share, otherwise false.
+    def has_object(self, digest):
+
+        assert isinstance(digest, remote_execution_pb2.Digest)
+
+        object_path = self.cas.objpath(digest)
+
+        return os.path.exists(object_path)
+
     # has_artifact():
     #
     # Checks whether the artifact is present in the share