| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # Authors: |
| # Tristan Maat <tristan.maat@codethink.co.uk> |
| |
| import os |
| |
| from ._assetcache import AssetCache |
| from ._cas.casremote import BlobNotFound |
| from ._exceptions import ArtifactError, AssetCacheError, CASError, CASRemoteError |
| from ._protos.buildstream.v2 import artifact_pb2 |
| |
| from . import utils |
| |
| REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:artifact:{}" |
| |
| |
| # An ArtifactCache manages artifacts. |
| # |
| # Args: |
| # context (Context): The BuildStream context |
| # |
| class ArtifactCache(AssetCache): |
| def __init__(self, context): |
| super().__init__(context) |
| |
| # create artifact directory |
| self._basedir = context.artifactdir |
| os.makedirs(self._basedir, exist_ok=True) |
| |
| # preflight(): |
| # |
| # Preflight check. |
| # |
| def preflight(self): |
| self.cas.preflight() |
| |
| # contains(): |
| # |
| # Check whether the artifact for the specified Element is already available |
| # in the local artifact cache. |
| # |
| # Args: |
| # element (Element): The Element to check |
| # key (str): The cache key to use |
| # |
| # Returns: True if the artifact is in the cache, False otherwise |
| # |
| def contains(self, element, key): |
| ref = element.get_artifact_name(key) |
| |
| return os.path.exists(os.path.join(self._basedir, ref)) |
| |
| # list_artifacts(): |
| # |
| # List artifacts in this cache in LRU order. |
| # |
| # Args: |
| # glob (str): An option glob expression to be used to list artifacts satisfying the glob |
| # |
| # Returns: |
| # ([str]) - A list of artifact names as generated in LRU order |
| # |
| def list_artifacts(self, *, glob=None): |
| return [ref for _, ref in sorted(list(self.list_refs_mtimes(self._basedir, glob_expr=glob)))] |
| |
| # remove(): |
| # |
| # Removes the artifact for the specified ref from the local |
| # artifact cache. |
| # |
| # Args: |
| # ref (artifact_name): The name of the artifact to remove (as |
| # generated by `Element.get_artifact_name`) |
| # |
| def remove(self, ref): |
| try: |
| self.remove_ref(ref) |
| except AssetCacheError as e: |
| raise ArtifactError("{}".format(e)) from e |
| |
| # push(): |
| # |
| # Push committed artifact to remote repository. |
| # |
| # Args: |
| # element (Element): The Element whose artifact is to be pushed |
| # artifact (Artifact): The artifact being pushed |
| # |
| # Returns: |
| # (bool): True if any remote was updated, False if no pushes were required |
| # |
| # Raises: |
| # (ArtifactError): if there was an error |
| # |
| def push(self, element, artifact): |
| project = element._get_project() |
| display_key = element._get_display_key() |
| |
| index_remotes, storage_remotes = self.get_remotes(project.name, True) |
| artifact_proto = artifact._get_proto() |
| artifact_digest = self.cas.add_object(buffer=artifact_proto.SerializeToString()) |
| |
| pushed = False |
| |
| # First push our files to all storage remotes, so that they |
| # can perform file checks on their end |
| for remote in storage_remotes: |
| remote.init() |
| element.status("Pushing data from artifact {} -> {}".format(display_key.brief, remote)) |
| |
| if self._push_artifact_blobs(artifact, artifact_digest, remote): |
| element.info("Pushed data from artifact {} -> {}".format(display_key.brief, remote)) |
| else: |
| element.info( |
| "Remote ({}) already has all data of artifact {} cached".format(remote, display_key.brief) |
| ) |
| |
| for remote in index_remotes: |
| remote.init() |
| element.status("Pushing artifact {} -> {}".format(display_key.brief, remote)) |
| |
| if self._push_artifact_proto(element, artifact, artifact_digest, remote): |
| element.info("Pushed artifact {} -> {}".format(display_key.brief, remote)) |
| pushed = True |
| else: |
| element.info("Remote ({}) already has artifact {} cached".format(remote, display_key.brief)) |
| |
| return pushed |
| |
| # pull(): |
| # |
| # Pull artifact from one of the configured remote repositories. |
| # |
| # Args: |
| # element (Element): The Element whose artifact is to be fetched |
| # key (str): The cache key to use |
| # pull_buildtrees (bool): Whether to pull buildtrees or not |
| # |
| # Returns: |
| # (bool): True if pull was successful, False if artifact was not available |
| # |
| def pull(self, element, key, *, pull_buildtrees=False): |
| artifact_digest = None |
| display_key = key[: self.context.log_key_length] |
| project = element._get_project() |
| |
| artifact_name = element.get_artifact_name(key=key) |
| uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name) |
| |
| index_remotes, storage_remotes = self.get_remotes(project.name, False) |
| |
| errors = [] |
| # Start by pulling our artifact proto, so that we know which |
| # blobs to pull |
| for remote in index_remotes: |
| remote.init() |
| try: |
| element.status("Pulling artifact {} <- {}".format(display_key, remote)) |
| response = remote.fetch_blob([uri]) |
| if response: |
| artifact_digest = response.blob_digest |
| break |
| |
| element.info("Remote ({}) does not have artifact {} cached".format(remote, display_key)) |
| except AssetCacheError as e: |
| element.warn("Could not pull from remote {}: {}".format(remote, e)) |
| errors.append(e) |
| |
| if errors and not artifact_digest: |
| raise ArtifactError( |
| "Failed to pull artifact {}".format(display_key), |
| detail="\n".join(str(e) for e in errors), |
| temporary=True, |
| ) |
| |
| # If we don't have an artifact, we can't exactly pull our |
| # artifact |
| if not artifact_digest: |
| return False |
| |
| errors = [] |
| # If we do, we can pull it! |
| for remote in storage_remotes: |
| remote.init() |
| try: |
| element.status("Pulling data for artifact {} <- {}".format(display_key, remote)) |
| |
| if self._pull_artifact_storage(element, key, artifact_digest, remote, pull_buildtrees=pull_buildtrees): |
| element.info("Pulled artifact {} <- {}".format(display_key, remote)) |
| return True |
| |
| element.info("Remote ({}) does not have artifact {} cached".format(remote, display_key)) |
| except BlobNotFound as e: |
| # Not all blobs are available on this remote |
| element.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob)) |
| continue |
| except CASError as e: |
| element.warn("Could not pull from remote {}: {}".format(remote, e)) |
| errors.append(e) |
| |
| if errors: |
| raise ArtifactError( |
| "Failed to pull artifact {}".format(display_key), |
| detail="\n".join(str(e) for e in errors), |
| temporary=True, |
| ) |
| |
| return False |
| |
| # link_key(): |
| # |
| # Add a key for an existing artifact. |
| # |
| # Args: |
| # element (Element): The Element whose artifact is to be linked |
| # oldkey (str): An existing cache key for the artifact |
| # newkey (str): A new cache key for the artifact |
| # |
| def link_key(self, element, oldkey, newkey): |
| oldref = element.get_artifact_name(oldkey) |
| newref = element.get_artifact_name(newkey) |
| |
| if oldref == newref: |
| # The two refs are identical, nothing to do |
| return |
| |
| utils.safe_link(os.path.join(self._basedir, oldref), os.path.join(self._basedir, newref)) |
| |
| # fetch_missing_blobs(): |
| # |
| # Fetch missing blobs from configured remote repositories. |
| # |
| # Args: |
| # project (Project): The current project |
| # missing_blobs (list): The Digests of the blobs to fetch |
| # |
| def fetch_missing_blobs(self, project, missing_blobs): |
| |
| index_remotes, _ = self.get_remotes(project.name, False) |
| for remote in index_remotes: |
| if not missing_blobs: |
| break |
| |
| remote.init() |
| |
| # fetch_blobs() will return the blobs that are still missing |
| missing_blobs = self.cas.fetch_blobs(remote, missing_blobs, allow_partial=True) |
| |
| if missing_blobs: |
| raise ArtifactError("Blobs not found on configured artifact servers") |
| |
| # find_missing_blobs(): |
| # |
| # Find missing blobs from configured push remote repositories. |
| # |
| # Args: |
| # project (Project): The current project |
| # missing_blobs (list): The Digests of the blobs to check |
| # |
| # Returns: |
| # (list): The Digests of the blobs missing on at least one push remote |
| # |
| def find_missing_blobs(self, project, missing_blobs): |
| if not missing_blobs: |
| return [] |
| |
| _, push_remotes = self.get_remotes(project.name, True) |
| remote_missing_blobs_list = [] |
| |
| for remote in push_remotes: |
| remote.init() |
| |
| remote_missing_blobs = self.cas.missing_blobs(missing_blobs, remote=remote) |
| |
| for blob in remote_missing_blobs: |
| if blob not in remote_missing_blobs_list: |
| remote_missing_blobs_list.append(blob) |
| |
| return remote_missing_blobs_list |
| |
| # check_remotes_for_element() |
| # |
| # Check if the element is available in any of the remotes |
| # |
| # Args: |
| # element (Element): The element to check |
| # |
| # Returns: |
| # (bool): True if the element is available remotely |
| # |
| def check_remotes_for_element(self, element): |
| project = element._get_project() |
| index_remotes, _ = self.get_remotes(project.name, False) |
| |
| # If there are no remotes |
| if not index_remotes: |
| return False |
| |
| ref = element.get_artifact_name() |
| for remote in index_remotes: |
| remote.init() |
| |
| if self._query_remote(ref, remote): |
| return True |
| |
| return False |
| |
| ################################################ |
| # Local Private Methods # |
| ################################################ |
| |
| # _push_artifact_blobs() |
| # |
| # Push the blobs that make up an artifact to the remote server. |
| # |
| # Args: |
| # artifact (Artifact): The artifact whose blobs to push. |
| # remote (CASRemote): The remote to push the blobs to. |
| # |
| # Returns: |
| # (bool) - True if we uploaded anything, False otherwise. |
| # |
| # Raises: |
| # ArtifactError: If we fail to push blobs (*unless* they're |
| # already there or we run out of space on the server). |
| # |
| def _push_artifact_blobs(self, artifact, artifact_digest, remote): |
| artifact_proto = artifact._get_proto() |
| |
| try: |
| if str(artifact_proto.files): |
| self.cas._send_directory(remote, artifact_proto.files) |
| |
| if str(artifact_proto.buildtree): |
| try: |
| self.cas._send_directory(remote, artifact_proto.buildtree) |
| except FileNotFoundError: |
| pass |
| |
| if str(artifact_proto.buildroot): |
| try: |
| self.cas._send_directory(remote, artifact_proto.buildroot) |
| except FileNotFoundError: |
| pass |
| |
| digests = [artifact_digest, artifact_proto.low_diversity_meta, artifact_proto.high_diversity_meta] |
| |
| if str(artifact_proto.public_data): |
| digests.append(artifact_proto.public_data) |
| |
| for log_file in artifact_proto.logs: |
| digests.append(log_file.digest) |
| |
| self.cas.send_blobs(remote, digests) |
| |
| except CASRemoteError as cas_error: |
| if cas_error.reason != "cache-too-full": |
| raise ArtifactError("Failed to push artifact blobs: {}".format(cas_error), temporary=True) |
| return False |
| |
| return True |
| |
| # _push_artifact_proto() |
| # |
| # Pushes the artifact proto to remote. |
| # |
| # Args: |
| # element (Element): The element |
| # artifact (Artifact): The related artifact being pushed |
| # remote (AssetRemote): Remote to push to |
| # |
| # Returns: |
| # (bool): Whether we pushed the artifact. |
| # |
| # Raises: |
| # ArtifactError: If the push fails for any reason except the |
| # artifact already existing. |
| # |
| def _push_artifact_proto(self, element, artifact, artifact_digest, remote): |
| |
| artifact_proto = artifact._get_proto() |
| |
| keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) |
| artifact_names = [element.get_artifact_name(key=key) for key in keys] |
| uris = [REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name) for artifact_name in artifact_names] |
| |
| try: |
| response = remote.fetch_blob(uris) |
| # Skip push if artifact is already on the server |
| if response and response.blob_digest == artifact_digest: |
| return False |
| except AssetCacheError as e: |
| raise ArtifactError("{}".format(e), temporary=True) from e |
| |
| referenced_directories = [] |
| if artifact_proto.files: |
| referenced_directories.append(artifact_proto.files) |
| if artifact_proto.buildtree: |
| referenced_directories.append(artifact_proto.buildtree) |
| if artifact_proto.sources: |
| referenced_directories.append(artifact_proto.sources) |
| if artifact_proto.buildroot: |
| referenced_directories.append(artifact_proto.buildroot) |
| |
| referenced_blobs = [artifact_proto.low_diversity_meta, artifact_proto.high_diversity_meta] + [ |
| log_file.digest for log_file in artifact_proto.logs |
| ] |
| |
| try: |
| remote.push_blob( |
| uris, |
| artifact_digest, |
| references_blobs=referenced_blobs, |
| references_directories=referenced_directories, |
| ) |
| except AssetCacheError as e: |
| raise ArtifactError("{}".format(e), temporary=True) from e |
| |
| return True |
| |
| # _pull_artifact_storage(): |
| # |
| # Pull artifact blobs from the given remote. |
| # |
| # Args: |
| # element (Element): element to pull |
| # key (str): The specific key for the artifact to pull |
| # remote (CASRemote): remote to pull from |
| # pull_buildtree (bool): whether to pull buildtrees or not |
| # |
| # Returns: |
| # (bool): True if we pulled any blobs. |
| # |
| # Raises: |
| # ArtifactError: If the pull failed for any reason except the |
| # blobs not existing on the server. |
| # |
| def _pull_artifact_storage(self, element, key, artifact_digest, remote, pull_buildtrees=False): |
| artifact_name = element.get_artifact_name(key=key) |
| |
| try: |
| # Fetch and parse artifact proto |
| self.cas.fetch_blobs(remote, [artifact_digest]) |
| artifact = artifact_pb2.Artifact() |
| with self.cas.open(artifact_digest, "rb") as f: |
| artifact.ParseFromString(f.read()) |
| |
| # Write the artifact proto to cache |
| artifact_path = os.path.join(self._basedir, artifact_name) |
| os.makedirs(os.path.dirname(artifact_path), exist_ok=True) |
| with utils.save_file_atomic(artifact_path, mode="wb") as f: |
| f.write(artifact.SerializeToString()) |
| |
| if str(artifact.files): |
| self.cas._fetch_directory(remote, artifact.files) |
| |
| if pull_buildtrees: |
| if str(artifact.buildtree): |
| self.cas._fetch_directory(remote, artifact.buildtree) |
| if str(artifact.buildroot): |
| self.cas._fetch_directory(remote, artifact.buildroot) |
| |
| digests = [artifact.low_diversity_meta, artifact.high_diversity_meta] |
| if str(artifact.public_data): |
| digests.append(artifact.public_data) |
| |
| for log_digest in artifact.logs: |
| digests.append(log_digest.digest) |
| |
| self.cas.fetch_blobs(remote, digests) |
| except BlobNotFound: |
| return False |
| except CASRemoteError as e: |
| raise ArtifactError("{}".format(e), temporary=True) from e |
| |
| return True |
| |
| # _query_remote() |
| # |
| # Args: |
| # ref (str): The artifact ref |
| # remote (AssetRemote): The remote we want to check |
| # |
| # Returns: |
| # (bool): True if the ref exists in the remote, False otherwise. |
| # |
| def _query_remote(self, ref, remote): |
| uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(ref) |
| |
| try: |
| response = remote.fetch_blob([uri]) |
| return bool(response) |
| except AssetCacheError as e: |
| raise ArtifactError("{}".format(e), temporary=True) from e |