_sandboxremote.py: Make storage-service optional with remote cache

Inherit `storage-service` configuration from `cache` configuration
section.
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index fc74745..540f376 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -749,4 +749,4 @@
             os.environ["XDG_DATA_HOME"] = os.path.expanduser("~/.local/share")
 
     def _load_remote_execution(self, node: MappingNode) -> Optional[RemoteExecutionSpec]:
-        return RemoteExecutionSpec.new_from_node(node)
+        return RemoteExecutionSpec.new_from_node(node, remote_cache=bool(self.remote_cache_spec))
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 715f940..4bdf7c9 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -501,7 +501,8 @@
             text += self.content_profile.fmt("Remote Execution Configuration\n", bold=True)
             values = OrderedDict()
             values["Execution Service"] = format_spec(specs.exec_spec)
-            values["Storage Service"] = format_spec(specs.storage_spec)
+            re_storage_spec = specs.storage_spec or context.remote_cache_spec
+            values["Storage Service"] = format_spec(re_storage_spec)
             if specs.action_spec:
                 values["Action Cache Service"] = format_spec(specs.action_spec)
             text += self._format_values(values)
diff --git a/src/buildstream/_remotespec.py b/src/buildstream/_remotespec.py
index a37698e..0e27217 100644
--- a/src/buildstream/_remotespec.py
+++ b/src/buildstream/_remotespec.py
@@ -454,9 +454,11 @@
 # communicate with various components of an RE build cluster.
 #
 class RemoteExecutionSpec:
-    def __init__(self, exec_spec: RemoteSpec, storage_spec: RemoteSpec, action_spec: Optional[RemoteSpec]) -> None:
+    def __init__(
+        self, exec_spec: RemoteSpec, storage_spec: Optional[RemoteSpec], action_spec: Optional[RemoteSpec]
+    ) -> None:
         self.exec_spec: RemoteSpec = exec_spec
-        self.storage_spec: RemoteSpec = storage_spec
+        self.storage_spec: Optional[RemoteSpec] = storage_spec
         self.action_spec: Optional[RemoteSpec] = action_spec
 
     # new_from_node():
@@ -474,15 +476,30 @@
     #    LoadError: If the node is malformed.
     #
     @classmethod
-    def new_from_node(cls, node: MappingNode, basedir: Optional[str] = None) -> "RemoteExecutionSpec":
+    def new_from_node(
+        cls, node: MappingNode, basedir: Optional[str] = None, *, remote_cache: bool = False
+    ) -> "RemoteExecutionSpec":
         node.validate_keys(["execution-service", "storage-service", "action-cache-service"])
 
         exec_node = node.get_mapping("execution-service")
-        storage_node = node.get_mapping("storage-service")
+        storage_node = node.get_mapping("storage-service", default=None)
+        if not storage_node and not remote_cache:
+            provenance = node.get_provenance()
+            raise LoadError(
+                "{}: Remote execution requires 'storage-service' to be specified in the 'remote-execution' section if not already specified globally in the 'cache' section".format(
+                    provenance
+                ),
+                LoadErrorReason.INVALID_DATA,
+            )
         action_node = node.get_mapping("action-cache-service", default=None)
 
         exec_spec = RemoteSpec.new_from_node(exec_node, basedir, remote_execution=True)
-        storage_spec = RemoteSpec.new_from_node(storage_node, basedir, remote_execution=True)
+
+        storage_spec: Optional[RemoteSpec]
+        if storage_node:
+            storage_spec = RemoteSpec.new_from_node(storage_node, basedir, remote_execution=True)
+        else:
+            storage_spec = None
 
         action_spec: Optional[RemoteSpec]
         if action_node:
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index e0a3a2d..d1e1c8f 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -42,6 +42,8 @@
         super().__init__(*args, **kwargs)
 
         context = self._get_context()
+        cascache = context.get_cascache()
+
         specs = context.remote_execution_specs
         if specs is None:
             return
@@ -51,6 +53,17 @@
         self.action_spec = specs.action_spec
         self.operation_name = None
 
+        if self.storage_spec:
+            self.storage_remote = CASRemote(self.storage_spec, cascache)
+            try:
+                self.storage_remote.init()
+            except grpc.RpcError as e:
+                raise SandboxError(
+                    "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e)
+                ) from e
+        else:
+            self.storage_remote = cascache.get_default_remote()
+
     def run_remote_command(self, channel, action_digest):
         # Sends an execution request to the remote execution server.
         #
@@ -141,13 +154,13 @@
         cascache = context.get_cascache()
 
         # Fetch the file blobs
-        dir_digest = vdir._get_digest()
-        required_blobs = cascache.required_blobs_for_directory(dir_digest)
+        if self.storage_spec:
+            dir_digest = vdir._get_digest()
+            required_blobs = cascache.required_blobs_for_directory(dir_digest)
 
-        local_missing_blobs = cascache.missing_blobs(required_blobs)
-        if local_missing_blobs:
-            with CASRemote(self.storage_spec, cascache) as casremote:
-                cascache.fetch_blobs(casremote, local_missing_blobs)
+            local_missing_blobs = cascache.missing_blobs(required_blobs)
+            if local_missing_blobs:
+                cascache.fetch_blobs(self.storage_remote, local_missing_blobs)
 
     def _execute_action(self, action, flags):
         stdout, stderr = self._get_output()
@@ -159,46 +172,40 @@
 
         action_digest = cascache.add_object(buffer=action.SerializeToString())
 
+        casremote = self.storage_remote
+
         # check action cache download and download if there
         action_result = self._check_action_cache(action_digest)
 
         if not action_result:
-            with CASRemote(self.storage_spec, cascache) as casremote:
+            with self._get_context().messenger.timed_activity(
+                "Uploading input root", element_name=self._get_element_name()
+            ):
+                # Determine blobs missing on remote
                 try:
-                    casremote.init()
+                    input_root_digest = action.input_root_digest
+                    missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
                 except grpc.RpcError as e:
-                    raise SandboxError(
-                        "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e)
-                    ) from e
+                    raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
 
-                with self._get_context().messenger.timed_activity(
-                    "Uploading input root", element_name=self._get_element_name()
-                ):
-                    # Determine blobs missing on remote
-                    try:
-                        input_root_digest = action.input_root_digest
-                        missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
-                    except grpc.RpcError as e:
-                        raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
+                # Check if any blobs are also missing locally (partial artifact)
+                # and pull them from the artifact cache.
+                try:
+                    local_missing_blobs = cascache.missing_blobs(missing_blobs)
+                    if local_missing_blobs:
+                        artifactcache.fetch_missing_blobs(project, local_missing_blobs)
+                except (grpc.RpcError, BstError) as e:
+                    raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
 
-                    # Check if any blobs are also missing locally (partial artifact)
-                    # and pull them from the artifact cache.
-                    try:
-                        local_missing_blobs = cascache.missing_blobs(missing_blobs)
-                        if local_missing_blobs:
-                            artifactcache.fetch_missing_blobs(project, local_missing_blobs)
-                    except (grpc.RpcError, BstError) as e:
-                        raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
+                # Add command and action messages to blob list to push
+                missing_blobs.append(action.command_digest)
+                missing_blobs.append(action_digest)
 
-                    # Add command and action messages to blob list to push
-                    missing_blobs.append(action.command_digest)
-                    missing_blobs.append(action_digest)
-
-                    # Now, push the missing blobs to the remote.
-                    try:
-                        cascache.send_blobs(casremote, missing_blobs)
-                    except grpc.RpcError as e:
-                        raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+                # Now, push the missing blobs to the remote.
+                try:
+                    cascache.send_blobs(casremote, missing_blobs)
+                except grpc.RpcError as e:
+                    raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
 
             # Now request to execute the action
             channel = self.exec_spec.open_channel()
@@ -207,17 +214,16 @@
                 action_result = self._extract_action_result(operation)
 
         # Fetch outputs
-        with CASRemote(self.storage_spec, cascache) as casremote:
-            for output_directory in action_result.output_directories:
-                tree_digest = output_directory.tree_digest
-                if tree_digest is None or not tree_digest.hash:
-                    raise SandboxError("Output directory structure had no digest attached.")
+        for output_directory in action_result.output_directories:
+            tree_digest = output_directory.tree_digest
+            if tree_digest is None or not tree_digest.hash:
+                raise SandboxError("Output directory structure had no digest attached.")
 
-                # Now do a pull to ensure we have the full directory structure.
-                cascache.pull_tree(casremote, tree_digest)
+            # Now do a pull to ensure we have the full directory structure.
+            cascache.pull_tree(casremote, tree_digest)
 
-            # Fetch stdout and stderr blobs
-            cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest])
+        # Fetch stdout and stderr blobs
+        cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest])
 
         # Forward remote stdout and stderr
         if stdout: