_sandboxremote.py: Make storage-service optional with remote cache
Inherit `storage-service` configuration from `cache` configuration
section.
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index fc74745..540f376 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -749,4 +749,4 @@
os.environ["XDG_DATA_HOME"] = os.path.expanduser("~/.local/share")
def _load_remote_execution(self, node: MappingNode) -> Optional[RemoteExecutionSpec]:
- return RemoteExecutionSpec.new_from_node(node)
+ return RemoteExecutionSpec.new_from_node(node, remote_cache=bool(self.remote_cache_spec))
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 715f940..4bdf7c9 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -501,7 +501,8 @@
text += self.content_profile.fmt("Remote Execution Configuration\n", bold=True)
values = OrderedDict()
values["Execution Service"] = format_spec(specs.exec_spec)
- values["Storage Service"] = format_spec(specs.storage_spec)
+ re_storage_spec = specs.storage_spec or context.remote_cache_spec
+ values["Storage Service"] = format_spec(re_storage_spec)
if specs.action_spec:
values["Action Cache Service"] = format_spec(specs.action_spec)
text += self._format_values(values)
diff --git a/src/buildstream/_remotespec.py b/src/buildstream/_remotespec.py
index a37698e..0e27217 100644
--- a/src/buildstream/_remotespec.py
+++ b/src/buildstream/_remotespec.py
@@ -454,9 +454,11 @@
# communicate with various components of an RE build cluster.
#
class RemoteExecutionSpec:
- def __init__(self, exec_spec: RemoteSpec, storage_spec: RemoteSpec, action_spec: Optional[RemoteSpec]) -> None:
+ def __init__(
+ self, exec_spec: RemoteSpec, storage_spec: Optional[RemoteSpec], action_spec: Optional[RemoteSpec]
+ ) -> None:
self.exec_spec: RemoteSpec = exec_spec
- self.storage_spec: RemoteSpec = storage_spec
+ self.storage_spec: Optional[RemoteSpec] = storage_spec
self.action_spec: Optional[RemoteSpec] = action_spec
# new_from_node():
@@ -474,15 +476,30 @@
# LoadError: If the node is malformed.
#
@classmethod
- def new_from_node(cls, node: MappingNode, basedir: Optional[str] = None) -> "RemoteExecutionSpec":
+ def new_from_node(
+ cls, node: MappingNode, basedir: Optional[str] = None, *, remote_cache: bool = False
+ ) -> "RemoteExecutionSpec":
node.validate_keys(["execution-service", "storage-service", "action-cache-service"])
exec_node = node.get_mapping("execution-service")
- storage_node = node.get_mapping("storage-service")
+ storage_node = node.get_mapping("storage-service", default=None)
+ if not storage_node and not remote_cache:
+ provenance = node.get_provenance()
+ raise LoadError(
+ "{}: Remote execution requires 'storage-service' to be specified in the 'remote-execution' section if not already specified globally in the 'cache' section".format(
+ provenance
+ ),
+ LoadErrorReason.INVALID_DATA,
+ )
action_node = node.get_mapping("action-cache-service", default=None)
exec_spec = RemoteSpec.new_from_node(exec_node, basedir, remote_execution=True)
- storage_spec = RemoteSpec.new_from_node(storage_node, basedir, remote_execution=True)
+
+ storage_spec: Optional[RemoteSpec]
+ if storage_node:
+ storage_spec = RemoteSpec.new_from_node(storage_node, basedir, remote_execution=True)
+ else:
+ storage_spec = None
action_spec: Optional[RemoteSpec]
if action_node:
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index e0a3a2d..d1e1c8f 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -42,6 +42,8 @@
super().__init__(*args, **kwargs)
context = self._get_context()
+ cascache = context.get_cascache()
+
specs = context.remote_execution_specs
if specs is None:
return
@@ -51,6 +53,17 @@
self.action_spec = specs.action_spec
self.operation_name = None
+ if self.storage_spec:
+ self.storage_remote = CASRemote(self.storage_spec, cascache)
+ try:
+ self.storage_remote.init()
+ except grpc.RpcError as e:
+ raise SandboxError(
+ "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e)
+ ) from e
+ else:
+ self.storage_remote = cascache.get_default_remote()
+
def run_remote_command(self, channel, action_digest):
# Sends an execution request to the remote execution server.
#
@@ -141,13 +154,13 @@
cascache = context.get_cascache()
# Fetch the file blobs
- dir_digest = vdir._get_digest()
- required_blobs = cascache.required_blobs_for_directory(dir_digest)
+ if self.storage_spec:
+ dir_digest = vdir._get_digest()
+ required_blobs = cascache.required_blobs_for_directory(dir_digest)
- local_missing_blobs = cascache.missing_blobs(required_blobs)
- if local_missing_blobs:
- with CASRemote(self.storage_spec, cascache) as casremote:
- cascache.fetch_blobs(casremote, local_missing_blobs)
+ local_missing_blobs = cascache.missing_blobs(required_blobs)
+ if local_missing_blobs:
+ cascache.fetch_blobs(self.storage_remote, local_missing_blobs)
def _execute_action(self, action, flags):
stdout, stderr = self._get_output()
@@ -159,46 +172,40 @@
action_digest = cascache.add_object(buffer=action.SerializeToString())
+ casremote = self.storage_remote
+
# check action cache download and download if there
action_result = self._check_action_cache(action_digest)
if not action_result:
- with CASRemote(self.storage_spec, cascache) as casremote:
+ with self._get_context().messenger.timed_activity(
+ "Uploading input root", element_name=self._get_element_name()
+ ):
+ # Determine blobs missing on remote
try:
- casremote.init()
+ input_root_digest = action.input_root_digest
+ missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
except grpc.RpcError as e:
- raise SandboxError(
- "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e)
- ) from e
+ raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
- with self._get_context().messenger.timed_activity(
- "Uploading input root", element_name=self._get_element_name()
- ):
- # Determine blobs missing on remote
- try:
- input_root_digest = action.input_root_digest
- missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
- except grpc.RpcError as e:
- raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
+ # Check if any blobs are also missing locally (partial artifact)
+ # and pull them from the artifact cache.
+ try:
+ local_missing_blobs = cascache.missing_blobs(missing_blobs)
+ if local_missing_blobs:
+ artifactcache.fetch_missing_blobs(project, local_missing_blobs)
+ except (grpc.RpcError, BstError) as e:
+ raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
- # Check if any blobs are also missing locally (partial artifact)
- # and pull them from the artifact cache.
- try:
- local_missing_blobs = cascache.missing_blobs(missing_blobs)
- if local_missing_blobs:
- artifactcache.fetch_missing_blobs(project, local_missing_blobs)
- except (grpc.RpcError, BstError) as e:
- raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
+ # Add command and action messages to blob list to push
+ missing_blobs.append(action.command_digest)
+ missing_blobs.append(action_digest)
- # Add command and action messages to blob list to push
- missing_blobs.append(action.command_digest)
- missing_blobs.append(action_digest)
-
- # Now, push the missing blobs to the remote.
- try:
- cascache.send_blobs(casremote, missing_blobs)
- except grpc.RpcError as e:
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+ # Now, push the missing blobs to the remote.
+ try:
+ cascache.send_blobs(casremote, missing_blobs)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
# Now request to execute the action
channel = self.exec_spec.open_channel()
@@ -207,17 +214,16 @@
action_result = self._extract_action_result(operation)
# Fetch outputs
- with CASRemote(self.storage_spec, cascache) as casremote:
- for output_directory in action_result.output_directories:
- tree_digest = output_directory.tree_digest
- if tree_digest is None or not tree_digest.hash:
- raise SandboxError("Output directory structure had no digest attached.")
+ for output_directory in action_result.output_directories:
+ tree_digest = output_directory.tree_digest
+ if tree_digest is None or not tree_digest.hash:
+ raise SandboxError("Output directory structure had no digest attached.")
- # Now do a pull to ensure we have the full directory structure.
- cascache.pull_tree(casremote, tree_digest)
+ # Now do a pull to ensure we have the full directory structure.
+ cascache.pull_tree(casremote, tree_digest)
- # Fetch stdout and stderr blobs
- cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest])
+ # Fetch stdout and stderr blobs
+ cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest])
# Forward remote stdout and stderr
if stdout: