| # |
| # Copyright (C) 2018-2019 Bloomberg Finance LP |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this library. If not, see <http://www.gnu.org/licenses/>. |
| |
| import os |
| import shlex |
| |
| from .sandbox import Sandbox, SandboxFlags, SandboxCommandError, _SandboxBatch |
| from .. import utils |
| from .._exceptions import ImplError, SandboxError |
| from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 |
| |
| |
| # SandboxREAPI() |
| # |
| # Abstract class providing a skeleton for sandbox implementations based on |
| # the Remote Execution API. |
| # |
| class SandboxREAPI(Sandbox): |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| |
| self._output_node_properties = kwargs.get("output_node_properties") |
| |
| def _run(self, command, flags, *, cwd, env): |
| context = self._get_context() |
| cascache = context.get_cascache() |
| |
| # set up virtual dircetory |
| vdir = self.get_virtual_directory() |
| |
| if not self._has_command(command[0], env): |
| raise SandboxCommandError( |
| "Staged artifacts do not provide command " "'{}'".format(command[0]), reason="missing-command" |
| ) |
| |
| # Ensure working directory exists |
| if len(cwd) > 1: |
| assert cwd.startswith("/") |
| vdir.descend(*cwd[1:].split(os.path.sep), create=True) |
| |
| # Ensure directories required for sandboxed execution exist |
| for directory in ["dev", "proc", "tmp"]: |
| vsubdir = vdir.descend(directory, create=True) |
| if flags & SandboxFlags.ROOT_READ_ONLY: |
| vsubdir._set_subtree_read_only(False) |
| |
| # Create directories for all marked directories. This emulates |
| # some of the behaviour of other sandboxes, which create these |
| # to use as mount points. |
| read_write_directories = [] |
| mount_sources = self._get_mount_sources() |
| for mark in self._get_marked_directories(): |
| directory = mark["directory"] |
| |
| if directory in mount_sources: |
| # Bind mount |
| mount_point = directory |
| mount_source = mount_sources[mount_point] |
| |
| # Ensure mount point exists in sandbox |
| mount_point_components = mount_point.split(os.path.sep) |
| if not vdir.exists(*mount_point_components): |
| if os.path.isdir(mount_source): |
| # Mounting a directory, mount point must be a directory |
| vdir.descend(*mount_point_components, create=True) |
| else: |
| # Mounting a file or device node, mount point must be a file |
| parent_vdir = vdir.descend(*mount_point_components[:-1], create=True) |
| parent_vdir._create_empty_file(mount_point_components[-1]) |
| else: |
| # Read-write directory |
| marked_vdir = vdir.descend(*directory.split(os.path.sep), create=True) |
| read_write_directories.append(directory) |
| if flags & SandboxFlags.ROOT_READ_ONLY: |
| marked_vdir._set_subtree_read_only(False) |
| |
| if flags & SandboxFlags.ROOT_READ_ONLY: |
| vdir._set_subtree_read_only(True) |
| else: |
| # The whole sandbox is writable |
| read_write_directories = [os.path.sep] |
| |
| # Generate Action proto |
| input_root_digest = vdir._get_digest() |
| command_proto = self._create_command(command, cwd, env, read_write_directories, flags) |
| command_digest = cascache.add_object(buffer=command_proto.SerializeToString()) |
| action = remote_execution_pb2.Action(command_digest=command_digest, input_root_digest=input_root_digest) |
| |
| action_result = self._execute_action(action, flags) # pylint: disable=assignment-from-no-return |
| |
| # Get output of build |
| self._process_job_output( |
| cwd, action_result.output_directories, action_result.output_files, failure=action_result.exit_code != 0 |
| ) |
| |
| # Non-zero exit code means a normal error during the build: |
| # the remote execution system has worked correctly but the command failed. |
| return action_result.exit_code |
| |
| def _create_command(self, command, working_directory, environment, read_write_directories, flags): |
| # Creates a command proto |
| environment_variables = [ |
| remote_execution_pb2.Command.EnvironmentVariable(name=k, value=v) for (k, v) in environment.items() |
| ] |
| |
| # Request read-write directories as output |
| output_directories = [os.path.relpath(dir, start=working_directory) for dir in read_write_directories] |
| |
| # Get the SandoxConfig |
| config = self._get_config() |
| default_dict = {} |
| default_dict["OSFamily"] = config.build_os |
| default_dict["ISA"] = config.build_arch |
| |
| if flags & SandboxFlags.INHERIT_UID: |
| uid = os.geteuid() |
| gid = os.getegid() |
| else: |
| uid = config.build_uid |
| gid = config.build_gid |
| if uid is not None: |
| default_dict["unixUID"] = str(uid) |
| if gid is not None: |
| default_dict["unixGID"] = str(gid) |
| |
| if flags & SandboxFlags.NETWORK_ENABLED: |
| default_dict["network"] = "on" |
| # Remove unsupported platform properties from the default dict, this filter is derived from the |
| # local sandbox capabilities |
| supported_properties = self._supported_platform_properties() |
| platform_dict = {key: value for (key, value) in default_dict.items() if key in supported_properties} |
| |
| # Get the platform properties dict, if specified. These are not filtered as they are specific |
| # to the remote server |
| platform_properties = self._get_platform_properties() |
| |
| # Apply the properties to the default_dict. k:v pairs in the default_dict |
| # can be disabled if given a explicit value of `[]` in platform properties |
| # with a matching key. |
| for platform_property, value in platform_properties.items(): |
| if platform_property in platform_dict: |
| if value != []: |
| raise SandboxError( |
| "Platform Property {}:{} should be configured in sandbox config, not remote-execution.".format( |
| platform_property, value |
| ), |
| reason="invalid-platform-property", |
| ) |
| del platform_dict[platform_property] |
| else: |
| platform_dict[platform_property] = value |
| |
| # Create Platform message with properties sorted by name in code point order |
| platform = remote_execution_pb2.Platform() |
| for key, value in sorted(platform_dict.items()): |
| platform.properties.add(name=key, value=value) |
| |
| return remote_execution_pb2.Command( |
| arguments=command, |
| working_directory=working_directory[1:], |
| environment_variables=environment_variables, |
| output_files=[], |
| output_directories=output_directories, |
| output_node_properties=self._output_node_properties, |
| platform=platform, |
| ) |
| |
| def _process_job_output(self, working_directory, output_directories, output_files, *, failure): |
| # Reads the remote execution server response to an execution request. |
| # |
| # output_directories is an array of OutputDirectory objects. |
| # output_files is an array of OutputFile objects. |
| # |
| if output_files: |
| raise SandboxError("Output files were returned when we didn't request any.") |
| |
| context = self._get_context() |
| cascache = context.get_cascache() |
| vdir = self.get_virtual_directory() |
| |
| for output_directory in output_directories: |
| tree_digest = output_directory.tree_digest |
| if tree_digest is None or not tree_digest.hash: |
| raise SandboxError("Output directory structure had no digest attached.") |
| |
| # Get digest of output directory from tree digest |
| tree = remote_execution_pb2.Tree() |
| with open(cascache.objpath(tree_digest), "rb") as f: |
| tree.ParseFromString(f.read()) |
| root_directory = tree.root.SerializeToString() |
| dir_digest = utils._message_digest(root_directory) |
| |
| # Create a normalized absolute path (inside the input tree) |
| path = os.path.normpath(os.path.join(working_directory, output_directory.path)) |
| |
| # Get virtual directory at the path of the output directory |
| vsubdir = vdir.descend(*path.split(os.path.sep), create=True) |
| |
| # Replace contents with returned output |
| vsubdir._reset(digest=dir_digest) |
| |
| def _create_batch(self, main_group, flags, *, collect=None): |
| return _SandboxREAPIBatch(self, main_group, flags, collect=collect) |
| |
| def _execute_action(self, action, flags): |
| raise ImplError("Sandbox of type '{}' does not implement _execute_action()".format(type(self).__name__)) |
| |
| def _supported_platform_properties(self): |
| return {"OSFamily", "ISA"} |
| |
| def _get_platform_properties(self): |
| return {} |
| |
| |
| # _SandboxREAPIBatch() |
| # |
| # Command batching by shell script generation. |
| # |
| class _SandboxREAPIBatch(_SandboxBatch): |
| def __init__(self, sandbox, main_group, flags, *, collect=None): |
| super().__init__(sandbox, main_group, flags, collect=collect) |
| |
| self.script = None |
| self.first_command = None |
| self.cwd = None |
| self.env = None |
| |
| def execute(self): |
| self.script = "" |
| |
| self.main_group.execute(self) |
| |
| first = self.first_command |
| if first: |
| context = self.sandbox._get_context() |
| with context.messenger.timed_activity( |
| "Running commands", |
| detail=self.main_group.combined_label(), |
| element_name=self.sandbox._get_element_name(), |
| ): |
| if self.sandbox.run(["sh", "-c", "-e", self.script], self.flags, cwd=first.cwd, env=first.env) != 0: |
| raise SandboxCommandError("Command failed", collect=self.collect) |
| |
| def execute_group(self, group): |
| group.execute_children(self) |
| |
| def execute_command(self, command): |
| if self.first_command is None: |
| # First command in batch |
| # Initial working directory and environment of script already matches |
| # the command configuration. |
| self.first_command = command |
| else: |
| # Change working directory for this command |
| if command.cwd != self.cwd: |
| self.script += "mkdir -p {}\n".format(command.cwd) |
| self.script += "cd {}\n".format(command.cwd) |
| |
| # Update environment for this command |
| for key in self.env.keys(): |
| if key not in command.env: |
| self.script += "unset {}\n".format(key) |
| for key, value in command.env.items(): |
| if key not in self.env or self.env[key] != value: |
| self.script += "export {}={}\n".format(key, shlex.quote(value)) |
| |
| # Keep track of current working directory and environment |
| self.cwd = command.cwd |
| self.env = command.env |
| |
| # Actual command execution |
| cmdline = " ".join(shlex.quote(cmd) for cmd in command.command) |
| self.script += "(set -ex; {})".format(cmdline) |
| |
| # Error handling |
| label = command.label or cmdline |
| quoted_label = shlex.quote("'{}'".format(label)) |
| self.script += " || (echo Command {} failed with exitcode $? >&2 ; exit 1)\n".format(quoted_label) |
| |
| def create_empty_file(self, name): |
| self.script += "touch -- {}\n".format(shlex.quote(name)) |