Merge pull request #1431 from apache/juerg/remote-cache
Support `storage-service` in cache configuration
diff --git a/doc/source/using_config.rst b/doc/source/using_config.rst
index acfff95..7391e93 100644
--- a/doc/source/using_config.rst
+++ b/doc/source/using_config.rst
@@ -96,8 +96,8 @@
.. _config_local_cache:
-Local cache control
--------------------
+Cache control
+-------------
Beyond deciding what directory you intend to place the cache, there are
some controls on what is cached locally and how.
@@ -121,6 +121,17 @@
# Avoid caching build trees if we don't need them
cache-buildtrees: auto
+ #
+ # Support CAS server as remote cache
+ # Useful to minimize network traffic with remote execution
+ # or to work with limited local disk space
+ storage-service:
+ url: https://cache-server.com/cas:11001
+ auth:
+ server-cert: server.crt
+ client-cert: client.crt
+ client-key: client.key
+
Attributes
~~~~~~~~~~
@@ -178,6 +189,40 @@
* ``auto``: Only cache the build trees where necessary (e.g. for failed builds)
* ``always``: Always cache the build tree.
+* ``storage-service``
+
+ An optional :ref:`service configuration <user_config_remote_execution_service>`
+ to use a *Content Addressable Storage* service as a remote cache. Write access
+ is required.
+
+ This service is compatible with the *storage* service offered by
+ :ref:`cache servers <config_cache_servers>`.
+
+ Without this option, all content is stored in the local cache. This includes
+ CAS objects from fetched sources, build outputs and pulled artifacts.
+ With this option, content is primarily stored in the remote cache and the
+ local cache is populated only as needed. E.g. ``bst artifact checkout``
+ will download CAS objects on demand from the remote cache.
+ This feature is incompatible with offline operation.
+
+ This is primarily useful in combination with
+ :ref:`remote execution <user_config_remote_execution>` to minimize downloads
+ of build outputs, which may not be needed locally. The elimination of
+ unnecessary downloads reduces total build time, especially if the bandwidth
+ between the local system and the remote execution cluster is limited.
+
+ .. tip::
+
+ Skip the ``storage-service`` option in the
+ :ref:`remote execution <user_config_remote_execution>` configuration to
+ use the same CAS service for caching and remote execution.
+
+ It is also possible to configure this with local builds without remote
+ execution. This enables operation with a small local cache even with large
+ projects. However, for local builds this can result in a longer total build
+ time due to additional network transfers. This is only recommended with a
+ high bandwidth connection to a storage-service, ideally in a local network.
+
Scheduler controls
------------------
@@ -900,7 +945,6 @@
.. code:: yaml
remote-execution:
- pull-artifact-files: True
execution-service:
url: http://execution.fallback.example.com:50051
instance-name: main
@@ -918,16 +962,6 @@
Attributes
''''''''''
-* ``pull-artifact-files``
-
- This determines whether you want the artifacts which were built remotely
- to be downloaded into the local CAS, so that it is ready for checkout
- directly after a built completes.
-
- If this is set to ``false``, then you will need to download the artifacts
- you intend to use with :ref:`bst artifact checkout <invoking_artifact_checkout>`
- after your build completes.
-
* ``execution-service``
A :ref:`service configuration <user_config_remote_execution_service>` specifying
@@ -943,6 +977,11 @@
This service is compatible with the *storage* service offered by
:ref:`cache servers <config_cache_servers>`.
+ This is optional if a ``storage-service`` is configured in the
+ :ref:`cache configuration <config_local_cache>`, in which case actual file
+ contents of build outputs will only be downloaded as needed, e.g. on
+ ``bst artifact checkout``.
+
* ``action-cache-service``
A :ref:`service configuration <user_config_remote_execution_service>` specifying
@@ -1095,7 +1134,6 @@
# remote execution configuration.
#
remote-execution:
- pull-artifact-files: True
execution-service:
url: http://execution.example.com:50051
instance-name: main
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index 28e2f59..163e2ae 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -551,24 +551,13 @@
# (bool): Whether artifact is in local cache
#
def query_cache(self):
- context = self._context
-
artifact = self._load_proto()
if not artifact:
self._cached = False
return False
- # Determine whether directories are required
- require_directories = context.require_artifact_directories
- # Determine whether file contents are required as well
- require_files = context.require_artifact_files or self._element._artifact_files_required()
-
# Check whether 'files' subdirectory is available, with or without file contents
- if (
- require_directories
- and str(artifact.files)
- and not self._cas.contains_directory(artifact.files, with_files=require_files)
- ):
+ if str(artifact.files) and not self._cas.contains_directory(artifact.files, with_files=True):
self._cached = False
return False
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index b80460a..609feb5 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -37,7 +37,7 @@
from .._exceptions import CASCacheError
from .casdprocessmanager import CASDProcessManager
-from .casremote import _CASBatchRead, _CASBatchUpdate, BlobNotFound
+from .casremote import CASRemote, _CASBatchRead, _CASBatchUpdate, BlobNotFound
_BUFFER_SIZE = 65536
@@ -69,6 +69,7 @@
*,
casd=True,
cache_quota=None,
+ remote_cache_spec=None,
protect_session_blobs=True,
log_level=CASLogLevel.WARNING,
log_directory=None
@@ -80,18 +81,25 @@
self._cache_usage_monitor = None
self._cache_usage_monitor_forbidden = False
+ self._remote_cache = bool(remote_cache_spec)
+
self._casd_process_manager = None
self._casd_channel = None
if casd:
assert log_directory is not None, "log_directory is required when casd is True"
log_dir = os.path.join(log_directory, "_casd")
self._casd_process_manager = CASDProcessManager(
- path, log_dir, log_level, cache_quota, protect_session_blobs
+ path, log_dir, log_level, cache_quota, remote_cache_spec, protect_session_blobs
)
self._casd_channel = self._casd_process_manager.create_channel()
self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel)
self._cache_usage_monitor.start()
+ else:
+ assert not self._remote_cache
+
+ self._default_remote = CASRemote(None, self)
+ self._default_remote.init()
# get_cas():
#
@@ -142,6 +150,9 @@
self._casd_process_manager.release_resources(messenger)
self._casd_process_manager = None
+ def get_default_remote(self):
+ return self._default_remote
+
# contains_files():
#
# Check whether file digests exist in the local CAS cache
@@ -168,13 +179,17 @@
def contains_directory(self, digest, *, with_files):
local_cas = self.get_local_cas()
+ # Without a remote cache, `FetchTree` simply checks the local cache.
request = local_cas_pb2.FetchTreeRequest()
request.root_digest.CopyFrom(digest)
- request.fetch_file_blobs = with_files
+ # Always fetch Directory protos as they are needed to enumerate subdirectories and files.
+ # Don't implicitly fetch file blobs from the remote cache as we don't need them.
+ request.fetch_file_blobs = with_files and not self._remote_cache
try:
local_cas.FetchTree(request)
- return True
+ if not self._remote_cache:
+ return True
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
return False
@@ -182,6 +197,10 @@
raise CASCacheError("Unsupported buildbox-casd version: FetchTree unimplemented") from e
raise
+ # Check whether everything is available in the remote cache.
+ missing_blobs = self.missing_blobs_for_directory(digest, remote=self._default_remote)
+ return not missing_blobs
+
# checkout():
#
# Checkout the specified directory digest.
@@ -191,7 +210,17 @@
# tree (Digest): The directory digest to extract
# can_link (bool): Whether we can create hard links in the destination
#
- def checkout(self, dest, tree, *, can_link=False):
+ def checkout(self, dest, tree, *, can_link=False, _fetch=True):
+ if _fetch and self._remote_cache:
+ # We need the files in the local cache
+ local_cas = self.get_local_cas()
+
+ request = local_cas_pb2.FetchTreeRequest()
+ request.root_digest.CopyFrom(tree)
+ request.fetch_file_blobs = True
+
+ local_cas.FetchTree(request)
+
os.makedirs(dest, exist_ok=True)
directory = remote_execution_pb2.Directory()
@@ -229,7 +258,7 @@
for dirnode in directory.directories:
fullpath = os.path.join(dest, dirnode.name)
- self.checkout(fullpath, dirnode.digest, can_link=can_link)
+ self.checkout(fullpath, dirnode.digest, can_link=can_link, _fetch=False)
for symlinknode in directory.symlinks:
# symlink
@@ -286,6 +315,11 @@
objpath = self.objpath(digest)
+ if self._remote_cache and not os.path.exists(objpath):
+ batch = _CASBatchRead(self._default_remote)
+ batch.add(digest)
+ batch.send()
+
return open(objpath, mode=mode)
# add_object():
@@ -399,7 +433,7 @@
if tree_response.status.code == code_pb2.RESOURCE_EXHAUSTED:
raise CASCacheError("Cache too full", reason="cache-too-full")
if tree_response.status.code != code_pb2.OK:
- raise CASCacheError("Failed to capture tree {}: {}".format(path, tree_response.status.code))
+ raise CASCacheError("Failed to capture tree {}: {}".format(path, tree_response.status))
treepath = self.objpath(tree_response.tree_digest)
tree = remote_execution_pb2.Tree()
@@ -469,10 +503,20 @@
# Generator that returns the Digests of all blobs in the tree specified by
# the Digest of the toplevel Directory object.
#
- def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None):
+ def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None, _fetch_tree=True):
if not excluded_subdirs:
excluded_subdirs = []
+ if self._remote_cache and _fetch_tree:
+ # Ensure we have the directory protos in the local cache
+ local_cas = self.get_local_cas()
+
+ request = local_cas_pb2.FetchTreeRequest()
+ request.root_digest.CopyFrom(directory_digest)
+ request.fetch_file_blobs = False
+
+ local_cas.FetchTree(request)
+
# parse directory, and recursively add blobs
yield directory_digest
@@ -487,7 +531,7 @@
for dirnode in directory.directories:
if dirnode.name not in excluded_subdirs:
- yield from self.required_blobs_for_directory(dirnode.digest)
+ yield from self.required_blobs_for_directory(dirnode.digest, _fetch_tree=False)
################################################
# Local Private Methods #
@@ -569,6 +613,10 @@
# Returns: The Digests of the blobs that were not available on the remote CAS
#
def fetch_blobs(self, remote, digests, *, allow_partial=False):
+ if self._remote_cache:
+ # Determine blobs missing in the remote cache and only fetch those
+ digests = self.missing_blobs(digests)
+
missing_blobs = [] if allow_partial else None
remote.init()
@@ -581,6 +629,15 @@
batch.send(missing_blobs=missing_blobs)
+ if self._remote_cache:
+ # Upload fetched blobs to the remote cache as we can't transfer
+ # blobs directly from another remote to the remote cache
+ batch = _CASBatchUpdate(self._default_remote)
+ for digest in digests:
+ if missing_blobs is None or digest not in missing_blobs: # pylint: disable=unsupported-membership-test
+ batch.add(digest)
+ batch.send()
+
return missing_blobs
# send_blobs():
@@ -592,6 +649,17 @@
# digests (list): The Digests of Blobs to upload
#
def send_blobs(self, remote, digests):
+ if self._remote_cache:
+ # First fetch missing blobs from the remote cache as we can't
+ # transfer blobs directly from the remote cache to another remote.
+
+ remote_missing_blobs = self.missing_blobs(digests, remote=remote)
+
+ batch = _CASBatchRead(self._default_remote)
+ for digest in remote_missing_blobs:
+ batch.add(digest)
+ batch.send()
+
batch = _CASBatchUpdate(remote)
for digest in digests:
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 11a16f2..0a7d768 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -51,10 +51,11 @@
# log_dir (str): The directory for the logs
# log_level (LogLevel): Log level to give to buildbox-casd for logging
# cache_quota (int): User configured cache quota
+# remote_cache_spec (RemoteSpec): Optional remote cache server
# protect_session_blobs (bool): Disable expiry for blobs used in the current session
#
class CASDProcessManager:
- def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+ def __init__(self, path, log_dir, log_level, cache_quota, remote_cache_spec, protect_session_blobs):
self._log_dir = log_dir
self._socket_path = self._make_socket_path(path)
@@ -71,6 +72,16 @@
if protect_session_blobs:
casd_args.append("--protect-session-blobs")
+ if remote_cache_spec:
+ casd_args.append("--cas-remote={}".format(remote_cache_spec.url))
+ if remote_cache_spec.instance_name:
+ casd_args.append("--cas-instance={}".format(remote_cache_spec.instance_name))
+ if remote_cache_spec.server_cert:
+ casd_args.append("--cas-server-cert={}".format(remote_cache_spec.server_cert))
+ if remote_cache_spec.client_key:
+ casd_args.append("--cas-client-key={}".format(remote_cache_spec.client_key))
+ casd_args.append("--cas-client-cert={}".format(remote_cache_spec.client_cert))
+
casd_args.append(path)
self._start_time = time.time()
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index 3799f95..b9ae3d7 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -55,6 +55,11 @@
# be called outside of init().
#
def _configure_protocols(self):
+ if not self.spec:
+ # Remote cache (handled by default instance in casd)
+ self.local_cas_instance_name = ""
+ return
+
local_cas = self.cascache.get_local_cas()
request = local_cas_pb2.GetInstanceNameForRemotesRequest()
cas_endpoint = request.content_addressable_storage
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 5ddd446..3a89736 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -109,7 +109,7 @@
logger.addHandler(handler)
casd_manager = CASDProcessManager(
- os.path.abspath(repo), os.path.join(os.path.abspath(repo), "logs"), log_level, quota, False
+ os.path.abspath(repo), os.path.join(os.path.abspath(repo), "logs"), log_level, quota, None, False
)
casd_channel = casd_manager.create_channel()
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 7a46caa..540f376 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -175,8 +175,8 @@
# User specified cache quota, used for display messages
self.config_cache_quota_string: Optional[str] = None
- # Whether to pull the files of an artifact when doing remote execution
- self.pull_artifact_files: bool = True
+ # Remote cache server
+ self.remote_cache_spec: Optional[RemoteSpec] = None
# Whether or not to attempt to pull build trees globally
self.pull_buildtrees: Optional[bool] = None
@@ -184,12 +184,6 @@
# Whether or not to cache build trees on artifact creation
self.cache_buildtrees: Optional[str] = None
- # Whether directory trees are required for all artifacts in the local cache
- self.require_artifact_directories: bool = True
-
- # Whether file contents are required for all artifacts in the local cache
- self.require_artifact_files: bool = True
-
# Don't shoot the messenger
self.messenger: Messenger = Messenger()
@@ -352,7 +346,7 @@
# We need to find the first existing directory in the path of our
# casdir - the casdir may not have been created yet.
cache = defaults.get_mapping("cache")
- cache.validate_keys(["quota", "pull-buildtrees", "cache-buildtrees"])
+ cache.validate_keys(["quota", "storage-service", "pull-buildtrees", "cache-buildtrees"])
cas_volume = self.casdir
while not os.path.exists(cas_volume):
@@ -368,6 +362,10 @@
LoadErrorReason.INVALID_DATA,
) from e
+ remote_cache = cache.get_mapping("storage-service", default=None)
+ if remote_cache:
+ self.remote_cache_spec = RemoteSpec.new_from_node(remote_cache)
+
# Load global artifact cache configuration
cache_config = defaults.get_mapping("artifacts", default={})
self._global_artifact_cache_config = _CacheConfig.new_from_node(cache_config)
@@ -376,10 +374,10 @@
cache_config = defaults.get_mapping("source-caches", default={})
self._global_source_cache_config = _CacheConfig.new_from_node(cache_config)
- # Load the global remote execution config including pull-artifact-files setting
+ # Load the global remote execution config
remote_execution = defaults.get_mapping("remote-execution", default=None)
if remote_execution:
- self.pull_artifact_files, self.remote_execution_specs = self._load_remote_execution(remote_execution)
+ self.remote_execution_specs = self._load_remote_execution(remote_execution)
# Load pull build trees configuration
self.pull_buildtrees = cache.get_bool("pull-buildtrees")
@@ -549,7 +547,7 @@
override_node = self.get_overrides(project.name)
remote_execution = override_node.get_mapping("remote-execution", default=None)
if remote_execution:
- self.pull_artifact_files, self.remote_execution_specs = self._load_remote_execution(remote_execution)
+ self.remote_execution_specs = self._load_remote_execution(remote_execution)
#
# Maintain our list of remote specs for artifact and source caches
@@ -657,15 +655,6 @@
# value which we cache here too.
return self._strict_build_plan
- # set_artifact_files_optional()
- #
- # This indicates that the current context (command or configuration)
- # does not require file contents of all artifacts to be available in the
- # local cache.
- #
- def set_artifact_files_optional(self) -> None:
- self.require_artifact_files = False
-
def get_cascache(self) -> CASCache:
if self._cascache is None:
if self.log_debug:
@@ -679,6 +668,7 @@
self.cachedir,
casd=self.use_casd,
cache_quota=self.config_cache_quota,
+ remote_cache_spec=self.remote_cache_spec,
log_level=log_level,
log_directory=self.logdir,
)
@@ -758,18 +748,5 @@
if not os.environ.get("XDG_DATA_HOME"):
os.environ["XDG_DATA_HOME"] = os.path.expanduser("~/.local/share")
- def _load_remote_execution(self, node: MappingNode) -> Tuple[bool, Optional[RemoteExecutionSpec]]:
- # The pull_artifact_files attribute is special, it is allowed to
- # be set to False even if there is no remote execution service configured.
- #
- pull_artifact_files: bool = node.get_bool("pull-artifact-files", default=True)
- node.safe_del("pull-artifact-files")
-
- # Don't pass the remote execution settings if that was the only option
- remote_execution_specs: Optional[RemoteExecutionSpec]
- if node.keys():
- remote_execution_specs = RemoteExecutionSpec.new_from_node(node)
- else:
- remote_execution_specs = None
-
- return pull_artifact_files, remote_execution_specs
+ def _load_remote_execution(self, node: MappingNode) -> Optional[RemoteExecutionSpec]:
+ return RemoteExecutionSpec.new_from_node(node, remote_cache=bool(self.remote_cache_spec))
diff --git a/src/buildstream/_elementproxy.py b/src/buildstream/_elementproxy.py
index a7b1f09..53780ea 100644
--- a/src/buildstream/_elementproxy.py
+++ b/src/buildstream/_elementproxy.py
@@ -146,9 +146,6 @@
def get_variable(self, varname: str) -> Optional[str]:
return cast("Element", self._plugin).get_variable(varname)
- def get_logs(self) -> List[str]:
- return cast("Element", self._plugin).get_logs()
-
##############################################################
# Element Internal APIs #
##############################################################
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 589e548..4bdf7c9 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -457,6 +457,11 @@
starttime = datetime.datetime.now()
text = ""
+ def format_spec(spec):
+ if spec.instance_name:
+ return "{} (instance: {})".format(spec.url, spec.instance_name)
+ return spec.url
+
self._resolved_keys = {element: element._get_cache_key() for element in stream.session_elements}
# Main invocation context
@@ -483,24 +488,23 @@
values["Maximum Build Tasks"] = context.sched_builders
values["Maximum Push Tasks"] = context.sched_pushers
values["Maximum Network Retries"] = context.sched_network_retries
+
+ if context.remote_cache_spec:
+ values["Cache Storage Service"] = format_spec(context.remote_cache_spec)
+
text += self._format_values(values)
if context.remote_execution_specs:
specs = context.remote_execution_specs
- def format_spec(spec):
- if spec.instance_name:
- return "{} (instance: {})".format(spec.url, spec.instance_name)
- return spec.url
-
text += "\n"
text += self.content_profile.fmt("Remote Execution Configuration\n", bold=True)
values = OrderedDict()
values["Execution Service"] = format_spec(specs.exec_spec)
- values["Storage Service"] = format_spec(specs.storage_spec)
+ re_storage_spec = specs.storage_spec or context.remote_cache_spec
+ values["Storage Service"] = format_spec(re_storage_spec)
if specs.action_spec:
values["Action Cache Service"] = format_spec(specs.action_spec)
- values["Pull artifact files"] = context.pull_artifact_files
text += self._format_values(values)
# Print information about each loaded project
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
index 0d47921..42314eb 100644
--- a/src/buildstream/_remote.py
+++ b/src/buildstream/_remote.py
@@ -52,7 +52,10 @@
return False
def __str__(self):
- return self.spec.url
+ if self.spec:
+ return self.spec.url
+ else:
+ return "(default remote)"
####################################################
# Remote API #
@@ -68,7 +71,9 @@
if self._initialized:
return
- self.channel = self.spec.open_channel()
+ if self.spec:
+ self.channel = self.spec.open_channel()
+
self._configure_protocols()
self._initialized = True
diff --git a/src/buildstream/_remotespec.py b/src/buildstream/_remotespec.py
index a37698e..0e27217 100644
--- a/src/buildstream/_remotespec.py
+++ b/src/buildstream/_remotespec.py
@@ -454,9 +454,11 @@
# communicate with various components of an RE build cluster.
#
class RemoteExecutionSpec:
- def __init__(self, exec_spec: RemoteSpec, storage_spec: RemoteSpec, action_spec: Optional[RemoteSpec]) -> None:
+ def __init__(
+ self, exec_spec: RemoteSpec, storage_spec: Optional[RemoteSpec], action_spec: Optional[RemoteSpec]
+ ) -> None:
self.exec_spec: RemoteSpec = exec_spec
- self.storage_spec: RemoteSpec = storage_spec
+ self.storage_spec: Optional[RemoteSpec] = storage_spec
self.action_spec: Optional[RemoteSpec] = action_spec
# new_from_node():
@@ -474,15 +476,30 @@
# LoadError: If the node is malformed.
#
@classmethod
- def new_from_node(cls, node: MappingNode, basedir: Optional[str] = None) -> "RemoteExecutionSpec":
+ def new_from_node(
+ cls, node: MappingNode, basedir: Optional[str] = None, *, remote_cache: bool = False
+ ) -> "RemoteExecutionSpec":
node.validate_keys(["execution-service", "storage-service", "action-cache-service"])
exec_node = node.get_mapping("execution-service")
- storage_node = node.get_mapping("storage-service")
+ storage_node = node.get_mapping("storage-service", default=None)
+ if not storage_node and not remote_cache:
+ provenance = node.get_provenance()
+ raise LoadError(
+ "{}: Remote execution requires 'storage-service' to be specified in the 'remote-execution' section if not already specified globally in the 'cache' section".format(
+ provenance
+ ),
+ LoadErrorReason.INVALID_DATA,
+ )
action_node = node.get_mapping("action-cache-service", default=None)
exec_spec = RemoteSpec.new_from_node(exec_node, basedir, remote_execution=True)
- storage_spec = RemoteSpec.new_from_node(storage_node, basedir, remote_execution=True)
+
+ storage_spec: Optional[RemoteSpec]
+ if storage_node:
+ storage_spec = RemoteSpec.new_from_node(storage_node, basedir, remote_execution=True)
+ else:
+ storage_spec = None
action_spec: Optional[RemoteSpec]
if action_node:
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458f..fcde00d 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -25,6 +25,7 @@
from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
+from .queues.cachequeryqueue import CacheQueryQueue
from .scheduler import Scheduler, SchedStatus
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/queues/cachequeryqueue.py b/src/buildstream/_scheduler/queues/cachequeryqueue.py
new file mode 100644
index 0000000..b650a91
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/cachequeryqueue.py
@@ -0,0 +1,66 @@
+#
+# Copyright (C) 2020 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..jobs import JobStatus
+from ...types import _KeyStrength
+
+
+# A queue which queries the cache for artifacts and sources
+#
+class CacheQueryQueue(Queue):
+
+ action_name = "Cache-query"
+ complete_name = "Cache queried"
+ resources = [ResourceType.PROCESS, ResourceType.CACHE]
+
+ def __init__(self, scheduler, *, sources=False):
+ super().__init__(scheduler)
+
+ self._sources = sources
+
+ def get_process_func(self):
+ if not self._sources:
+ return CacheQueryQueue._query_artifacts_or_sources
+ else:
+ return CacheQueryQueue._query_sources
+
+ def status(self, element):
+ if not element._get_cache_key(strength=_KeyStrength.WEAK):
+ # Strict and weak cache keys are unavailable if the element or
+ # a dependency has an unresolved source
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
+
+ def done(self, _, element, result, status):
+ if status is JobStatus.FAIL:
+ return
+
+ if not self._sources:
+ if not element._pull_pending():
+ element._load_artifact_done()
+
+ @staticmethod
+ def _query_artifacts_or_sources(element):
+ element._load_artifact(pull=False)
+ if not element._can_query_cache() or not element._cached_success():
+ element._query_source_cache()
+
+ @staticmethod
+ def _query_sources(element):
+ element._query_source_cache()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index d03480e..12c6638 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,6 +38,7 @@
Scheduler,
SchedStatus,
TrackQueue,
+ CacheQueryQueue,
FetchQueue,
SourcePushQueue,
BuildQueue,
@@ -206,20 +207,31 @@
# Enqueue complete build plan as this is required to determine `buildable` status.
plan = list(_pipeline.dependencies(elements, _Scope.ALL))
- for element in plan:
- if element._can_query_cache():
- # Cache status already available.
- # This is the case for artifact elements, which load the
- # artifact early on.
- pass
- elif not only_sources and element._get_cache_key(strength=_KeyStrength.WEAK):
- element._load_artifact(pull=False)
- if sources_of_cached_elements or not element._can_query_cache() or not element._cached_success():
+ if self._context.remote_cache_spec:
+ # Parallelize cache queries if a remote cache is configured
+ self._reset()
+ self._add_queue(CacheQueryQueue(self._scheduler, sources=only_sources), track=True)
+ self._enqueue_plan(plan)
+ self._run()
+ else:
+ for element in plan:
+ if element._can_query_cache():
+ # Cache status already available.
+ # This is the case for artifact elements, which load the
+ # artifact early on.
+ pass
+ elif not only_sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+ element._load_artifact(pull=False)
+ if (
+ sources_of_cached_elements
+ or not element._can_query_cache()
+ or not element._cached_success()
+ ):
+ element._query_source_cache()
+ if not element._pull_pending():
+ element._load_artifact_done()
+ elif element._has_all_sources_resolved():
element._query_source_cache()
- if not element._pull_pending():
- element._load_artifact_done()
- elif element._has_all_sources_resolved():
- element._query_source_cache()
# shell()
#
@@ -378,17 +390,6 @@
# Assert that the elements are consistent
_pipeline.assert_consistent(self._context, elements)
- if self._context.remote_execution_specs:
- # Remote execution is configured.
- # Require artifact files only for target elements and their runtime dependencies.
- self._context.set_artifact_files_optional()
-
- # fetch blobs of targets if options set
- if self._context.pull_artifact_files:
- scope = _Scope.ALL if selection == _PipelineSelection.ALL else _Scope.RUN
- for element in self.targets:
- element._set_artifact_files_required(scope=scope)
-
source_push_enabled = self._sourcecache.has_push_remotes()
# If source push is enabled, the source cache status of all elements
@@ -437,6 +438,11 @@
ignore_project_source_remotes: bool = False,
):
+ if self._context.remote_cache_spec:
+ self._context.messenger.warn(
+ "Cache Storage Service is configured, fetched sources may not be available in the local cache"
+ )
+
elements = self._load(
targets,
selection=selection,
@@ -555,6 +561,11 @@
ignore_project_artifact_remotes: bool = False,
):
+ if self._context.remote_cache_spec:
+ self._context.messenger.warn(
+ "Cache Storage Service is configured, pulled artifacts may not be available in the local cache"
+ )
+
elements = self._load(
targets,
selection=selection,
@@ -796,7 +807,7 @@
self._context.messenger.warn("{} is cached without log files".format(ref))
continue
- artifact_logs[obj.name] = obj.get_logs()
+ artifact_logs[obj.name] = obj._get_logs()
return artifact_logs
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 8a484a2..38240bb 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -296,7 +296,6 @@
self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap
self.__tainted = None # Whether the artifact is tainted and should not be shared
self.__required = False # Whether the artifact is required in the current session
- self.__artifact_files_required = False # Whether artifact files are required in the local cache
self.__build_result = None # The result of assembling this Element (success, description, detail)
# Artifact class for direct artifact composite interaction
self.__artifact = None # type: Optional[Artifact]
@@ -839,14 +838,6 @@
self.__batch_prepare_assemble_flags = flags
self.__batch_prepare_assemble_collect = collect
- def get_logs(self) -> List[str]:
- """Obtain a list of log file paths
-
- Returns:
- A list of log file paths
- """
- return cast(Artifact, self.__artifact).get_logs()
-
#############################################################
# Private Methods used in BuildStream #
#############################################################
@@ -1563,31 +1554,6 @@
def _is_required(self):
return self.__required
- # _set_artifact_files_required():
- #
- # Mark artifact files for this element and its runtime dependencies as
- # required in the local cache.
- #
- def _set_artifact_files_required(self, scope=_Scope.RUN):
- assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
-
- if self.__artifact_files_required:
- # Already done
- return
-
- self.__artifact_files_required = True
-
- # Request artifact files of runtime dependencies
- for dep in self._dependencies(scope, recurse=False):
- dep._set_artifact_files_required(scope=scope)
-
- # _artifact_files_required():
- #
- # Returns whether artifact files for this element have been marked as required.
- #
- def _artifact_files_required(self):
- return self.__artifact_files_required
-
# __should_schedule()
#
# Returns:
@@ -2498,6 +2464,16 @@
self.__whitelist_regex = re.compile(expression)
return self.__whitelist_regex.match(os.path.join(os.sep, path))
+ # _get_logs()
+ #
+ # Obtain a list of log file paths
+ #
+ # Returns:
+ # A list of log file paths
+ #
+ def _get_logs(self) -> List[str]:
+ return cast(Artifact, self.__artifact).get_logs()
+
#############################################################
# Private Local Methods #
#############################################################
@@ -2795,8 +2771,6 @@
self.info("Using a remote sandbox for artifact {} with directory '{}'".format(self.name, directory))
- output_files_required = context.require_artifact_files or self._artifact_files_required()
-
sandbox = SandboxRemote(
context,
project,
@@ -2805,7 +2779,6 @@
stdout=stdout,
stderr=stderr,
config=config,
- output_files_required=output_files_required,
output_node_properties=output_node_properties,
)
yield sandbox
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index af50051..d1e1c8f 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -41,9 +41,9 @@
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- self._output_files_required = kwargs.get("output_files_required", True)
-
context = self._get_context()
+ cascache = context.get_cascache()
+
specs = context.remote_execution_specs
if specs is None:
return
@@ -53,6 +53,17 @@
self.action_spec = specs.action_spec
self.operation_name = None
+ if self.storage_spec:
+ self.storage_remote = CASRemote(self.storage_spec, cascache)
+ try:
+ self.storage_remote.init()
+ except grpc.RpcError as e:
+ raise SandboxError(
+ "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e)
+ ) from e
+ else:
+ self.storage_remote = cascache.get_default_remote()
+
def run_remote_command(self, channel, action_digest):
# Sends an execution request to the remote execution server.
#
@@ -140,29 +151,16 @@
def _fetch_missing_blobs(self, vdir):
context = self._get_context()
- project = self._get_project()
cascache = context.get_cascache()
- artifactcache = context.artifactcache
- # Fetch the file blobs if needed
- if self._output_files_required or artifactcache.has_push_remotes():
+ # Fetch the file blobs
+ if self.storage_spec:
dir_digest = vdir._get_digest()
required_blobs = cascache.required_blobs_for_directory(dir_digest)
local_missing_blobs = cascache.missing_blobs(required_blobs)
if local_missing_blobs:
- if self._output_files_required:
- # Fetch all blobs from Remote Execution CAS server
- blobs_to_fetch = local_missing_blobs
- else:
- # Output files are not required in the local cache,
- # however, artifact push remotes will need them.
- # Only fetch blobs that are missing on one or multiple
- # artifact servers.
- blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs)
-
- with CASRemote(self.storage_spec, cascache) as casremote:
- cascache.fetch_blobs(casremote, blobs_to_fetch)
+ cascache.fetch_blobs(self.storage_remote, local_missing_blobs)
def _execute_action(self, action, flags):
stdout, stderr = self._get_output()
@@ -174,46 +172,40 @@
action_digest = cascache.add_object(buffer=action.SerializeToString())
+ casremote = self.storage_remote
+
# check action cache download and download if there
action_result = self._check_action_cache(action_digest)
if not action_result:
- with CASRemote(self.storage_spec, cascache) as casremote:
+ with self._get_context().messenger.timed_activity(
+ "Uploading input root", element_name=self._get_element_name()
+ ):
+ # Determine blobs missing on remote
try:
- casremote.init()
+ input_root_digest = action.input_root_digest
+ missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
except grpc.RpcError as e:
- raise SandboxError(
- "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e)
- ) from e
+ raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
- with self._get_context().messenger.timed_activity(
- "Uploading input root", element_name=self._get_element_name()
- ):
- # Determine blobs missing on remote
- try:
- input_root_digest = action.input_root_digest
- missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
- except grpc.RpcError as e:
- raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
+ # Check if any blobs are also missing locally (partial artifact)
+ # and pull them from the artifact cache.
+ try:
+ local_missing_blobs = cascache.missing_blobs(missing_blobs)
+ if local_missing_blobs:
+ artifactcache.fetch_missing_blobs(project, local_missing_blobs)
+ except (grpc.RpcError, BstError) as e:
+ raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
- # Check if any blobs are also missing locally (partial artifact)
- # and pull them from the artifact cache.
- try:
- local_missing_blobs = cascache.missing_blobs(missing_blobs)
- if local_missing_blobs:
- artifactcache.fetch_missing_blobs(project, local_missing_blobs)
- except (grpc.RpcError, BstError) as e:
- raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
+ # Add command and action messages to blob list to push
+ missing_blobs.append(action.command_digest)
+ missing_blobs.append(action_digest)
- # Add command and action messages to blob list to push
- missing_blobs.append(action.command_digest)
- missing_blobs.append(action_digest)
-
- # Now, push the missing blobs to the remote.
- try:
- cascache.send_blobs(casremote, missing_blobs)
- except grpc.RpcError as e:
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+ # Now, push the missing blobs to the remote.
+ try:
+ cascache.send_blobs(casremote, missing_blobs)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
# Now request to execute the action
channel = self.exec_spec.open_channel()
@@ -222,17 +214,16 @@
action_result = self._extract_action_result(operation)
# Fetch outputs
- with CASRemote(self.storage_spec, cascache) as casremote:
- for output_directory in action_result.output_directories:
- tree_digest = output_directory.tree_digest
- if tree_digest is None or not tree_digest.hash:
- raise SandboxError("Output directory structure had no digest attached.")
+ for output_directory in action_result.output_directories:
+ tree_digest = output_directory.tree_digest
+ if tree_digest is None or not tree_digest.hash:
+ raise SandboxError("Output directory structure had no digest attached.")
- # Now do a pull to ensure we have the full directory structure.
- cascache.pull_tree(casremote, tree_digest)
+ # Now do a pull to ensure we have the full directory structure.
+ cascache.pull_tree(casremote, tree_digest)
- # Fetch stdout and stderr blobs
- cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest])
+ # Fetch stdout and stderr blobs
+ cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest])
# Forward remote stdout and stderr
if stdout:
diff --git a/tests/frontend/remote-caches.py b/tests/frontend/remote-caches.py
index 103af10..4aee857 100644
--- a/tests/frontend/remote-caches.py
+++ b/tests/frontend/remote-caches.py
@@ -36,6 +36,36 @@
@pytest.mark.datafiles(DATA_DIR)
+def test_build_checkout(cli, tmpdir, datafiles):
+ cachedir = os.path.join(str(tmpdir), "cache")
+ project = str(datafiles)
+ checkout = os.path.join(cli.directory, "checkout")
+
+ with create_artifact_share(os.path.join(str(tmpdir), "remote-cache")) as remote_cache:
+ # Enable remote cache
+ cli.configure({"cache": {"storage-service": {"url": remote_cache.repo}}})
+
+ # First build it
+ result = cli.run(project=project, args=["build", "target.bst"])
+ result.assert_success()
+
+ # Discard the local CAS cache
+ shutil.rmtree(str(os.path.join(cachedir, "cas")))
+
+ # Now check it out, this should automatically fetch the necessary blobs
+ # from the remote cache
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkout])
+ result.assert_success()
+
+ # Check that the executable hello file is found in the checkout
+ filename = os.path.join(checkout, "usr", "bin", "hello")
+ assert os.path.exists(filename)
+
+ filename = os.path.join(checkout, "usr", "include", "pony.h")
+ assert os.path.exists(filename)
+
+
+@pytest.mark.datafiles(DATA_DIR)
def test_source_artifact_caches(cli, tmpdir, datafiles):
cachedir = os.path.join(str(tmpdir), "cache")
project_dir = str(datafiles)
diff --git a/tests/remoteexecution/partial.py b/tests/remoteexecution/partial.py
deleted file mode 100644
index a47abee..0000000
--- a/tests/remoteexecution/partial.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Pylint doesn't play well with fixtures and dependency injection from pytest
-# pylint: disable=redefined-outer-name
-
-import os
-import pytest
-
-from buildstream.exceptions import ErrorDomain
-from buildstream.testing import cli_remote_execution as cli # pylint: disable=unused-import
-from buildstream.testing.integration import assert_contains
-
-from tests.testutils.artifactshare import create_artifact_share
-
-
-pytestmark = pytest.mark.remoteexecution
-
-
-DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project")
-
-
-# Test that `bst build` does not download file blobs of a build-only dependency
-# to the local cache.
-@pytest.mark.datafiles(DATA_DIR)
-@pytest.mark.parametrize("pull_artifact_files", [True, False])
-@pytest.mark.parametrize("build_all", [True, False])
-def test_build_dependency_partial_local_cas(cli, datafiles, pull_artifact_files, build_all):
- project = str(datafiles)
- element_name = "no-runtime-deps.bst"
- builddep_element_name = "autotools/amhello.bst"
- checkout = os.path.join(cli.directory, "checkout")
- builddep_checkout = os.path.join(cli.directory, "builddep-checkout")
-
- services = cli.ensure_services()
- assert set(services) == set(["action-cache", "execution", "storage"])
-
- # configure pull blobs
- if build_all:
- cli.configure({"build": {"dependencies": "all"}})
- cli.config["remote-execution"]["pull-artifact-files"] = pull_artifact_files
-
- result = cli.run(project=project, args=["build", element_name])
- result.assert_success()
-
- # Verify artifact is pulled bar files when ensure artifact files is set
- result = cli.run(project=project, args=["artifact", "checkout", element_name, "--directory", checkout])
- if pull_artifact_files:
- result.assert_success()
- assert_contains(checkout, ["/test"])
- else:
- result.assert_main_error(ErrorDomain.STREAM, "uncached-checkout-attempt")
-
- # Verify build dependencies is pulled for ALL and BUILD
- result = cli.run(
- project=project, args=["artifact", "checkout", builddep_element_name, "--directory", builddep_checkout]
- )
- if build_all and pull_artifact_files:
- result.assert_success()
- else:
- result.assert_main_error(ErrorDomain.STREAM, "uncached-checkout-attempt")
-
-
-@pytest.mark.datafiles(DATA_DIR)
-def test_build_partial_push(cli, tmpdir, datafiles):
- project = str(datafiles)
- share_dir = os.path.join(str(tmpdir), "artifactshare")
- element_name = "no-runtime-deps.bst"
- builddep_element_name = "autotools/amhello.bst"
-
- with create_artifact_share(share_dir) as share:
-
- services = cli.ensure_services()
- assert set(services) == set(["action-cache", "execution", "storage"])
-
- cli.config["artifacts"] = {"servers": [{"url": share.repo, "push": True,}]}
-
- res = cli.run(project=project, args=["build", element_name])
- res.assert_success()
-
- assert builddep_element_name in res.get_pushed_elements()
diff --git a/tests/remoteexecution/remotecache.py b/tests/remoteexecution/remotecache.py
new file mode 100644
index 0000000..ee3465e
--- /dev/null
+++ b/tests/remoteexecution/remotecache.py
@@ -0,0 +1,62 @@
+# Pylint doesn't play well with fixtures and dependency injection from pytest
+# pylint: disable=redefined-outer-name
+
+import copy
+import os
+import pytest
+
+from buildstream.exceptions import ErrorDomain
+from buildstream.testing import cli_remote_execution as cli # pylint: disable=unused-import
+from buildstream.testing.integration import assert_contains
+
+
+pytestmark = pytest.mark.remoteexecution
+
+
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project")
+
+
+# Test building an executable with remote-execution and remote-cache enabled
+@pytest.mark.datafiles(DATA_DIR)
+def test_remote_autotools_build(cli, datafiles, remote_services):
+ project = str(datafiles)
+ checkout = os.path.join(cli.directory, "checkout")
+ element_name = "autotools/amhello.bst"
+
+ services = cli.ensure_services()
+ assert set(services) == set(["action-cache", "execution", "storage"])
+
+ # Enable remote cache and remove explicit remote execution CAS configuration.
+ config_without_remote_cache = copy.deepcopy(cli.config)
+ cli.configure({"cache": {"storage-service": {"url": remote_services.storage_service}}})
+ del cli.config["remote-execution"]["storage-service"]
+ config_with_remote_cache = cli.config
+
+ # Build element with remote execution.
+ result = cli.run(project=project, args=["build", element_name])
+ result.assert_success()
+
+ # Attempt checkout from local cache by temporarily disabling remote cache.
+ # This should fail as the build result shouldn't have been downloaded to the local cache.
+ cli.config = config_without_remote_cache
+ result = cli.run(project=project, args=["artifact", "checkout", element_name, "--directory", checkout])
+ result.assert_main_error(ErrorDomain.STREAM, "uncached-checkout-attempt")
+ cli.config = config_with_remote_cache
+
+ # Attempt checkout again with remote cache.
+ result = cli.run(project=project, args=["artifact", "checkout", element_name, "--directory", checkout])
+ result.assert_success()
+
+ assert_contains(
+ checkout,
+ [
+ "/usr",
+ "/usr/lib",
+ "/usr/bin",
+ "/usr/share",
+ "/usr/bin/hello",
+ "/usr/share/doc",
+ "/usr/share/doc/amhello",
+ "/usr/share/doc/amhello/README",
+ ],
+ )