| # |
| # Copyright (C) 2017-2018 Codethink Limited |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this library. If not, see <http://www.gnu.org/licenses/>. |
| # |
| # Authors: |
| # Tristan Maat <tristan.maat@codethink.co.uk> |
| |
| import os |
| import string |
| from collections import Mapping, namedtuple |
| |
| from ..element import _KeyStrength |
| from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason |
| from .._message import Message, MessageType |
| from .. import utils |
| from .. import _yaml |
| |
| |
| # An ArtifactCacheSpec holds the user configuration for a single remote |
| # artifact cache. |
| # |
| # Args: |
| # url (str): Location of the remote artifact cache |
| # push (bool): Whether we should attempt to push artifacts to this cache, |
| # in addition to pulling from it. |
| # |
| class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push')): |
| |
| # _new_from_config_node |
| # |
| # Creates an ArtifactCacheSpec() from a YAML loaded node |
| # |
| @staticmethod |
| def _new_from_config_node(spec_node): |
| _yaml.node_validate(spec_node, ['url', 'push']) |
| url = _yaml.node_get(spec_node, str, 'url') |
| push = _yaml.node_get(spec_node, bool, 'push', default_value=False) |
| if not url: |
| provenance = _yaml.node_get_provenance(spec_node) |
| raise LoadError(LoadErrorReason.INVALID_DATA, |
| "{}: empty artifact cache URL".format(provenance)) |
| return ArtifactCacheSpec(url, push) |
| |
| |
| # An ArtifactCache manages artifacts. |
| # |
| # Args: |
| # context (Context): The BuildStream context |
| # |
| class ArtifactCache(): |
| def __init__(self, context): |
| self.context = context |
| self.required_artifacts = set() |
| self.extractdir = os.path.join(context.artifactdir, 'extract') |
| self.max_size = context.cache_quota |
| self.estimated_size = None |
| |
| self.global_remote_specs = [] |
| self.project_remote_specs = {} |
| |
| self._local = False |
| |
| os.makedirs(context.artifactdir, exist_ok=True) |
| |
| ################################################ |
| # Methods implemented on the abstract class # |
| ################################################ |
| |
| # get_artifact_fullname() |
| # |
| # Generate a full name for an artifact, including the |
| # project namespace, element name and cache key. |
| # |
| # This can also be used as a relative path safely, and |
| # will normalize parts of the element name such that only |
| # digits, letters and some select characters are allowed. |
| # |
| # Args: |
| # element (Element): The Element object |
| # key (str): The element's cache key |
| # |
| # Returns: |
| # (str): The relative path for the artifact |
| # |
| def get_artifact_fullname(self, element, key): |
| project = element._get_project() |
| |
| # Normalize ostree ref unsupported chars |
| valid_chars = string.digits + string.ascii_letters + '-._' |
| element_name = ''.join([ |
| x if x in valid_chars else '_' |
| for x in element.normal_name |
| ]) |
| |
| assert key is not None |
| |
| # assume project and element names are not allowed to contain slashes |
| return '{0}/{1}/{2}'.format(project.name, element_name, key) |
| |
| # setup_remotes(): |
| # |
| # Sets up which remotes to use |
| # |
| # Args: |
| # use_config (bool): Whether to use project configuration |
| # remote_url (str): Remote artifact cache URL |
| # |
| # This requires that all of the projects which are to be processed in the session |
| # have already been loaded and are observable in the Context. |
| # |
| def setup_remotes(self, *, use_config=False, remote_url=None): |
| |
| # Initialize remote artifact caches. We allow the commandline to override |
| # the user config in some cases (for example `bst push --remote=...`). |
| has_remote_caches = False |
| if remote_url: |
| self._set_remotes([ArtifactCacheSpec(remote_url, push=True)]) |
| has_remote_caches = True |
| if use_config: |
| for project in self.context.get_projects(): |
| artifact_caches = _configured_remote_artifact_cache_specs(self.context, project) |
| if artifact_caches: # artifact_caches is a list of ArtifactCacheSpec instances |
| self._set_remotes(artifact_caches, project=project) |
| has_remote_caches = True |
| if has_remote_caches: |
| self._initialize_remotes() |
| |
| # specs_from_config_node() |
| # |
| # Parses the configuration of remote artifact caches from a config block. |
| # |
| # Args: |
| # config_node (dict): The config block, which may contain the 'artifacts' key |
| # |
| # Returns: |
| # A list of ArtifactCacheSpec instances. |
| # |
| # Raises: |
| # LoadError, if the config block contains invalid keys. |
| # |
| @staticmethod |
| def specs_from_config_node(config_node): |
| cache_specs = [] |
| |
| artifacts = config_node.get('artifacts', []) |
| if isinstance(artifacts, Mapping): |
| cache_specs.append(ArtifactCacheSpec._new_from_config_node(artifacts)) |
| elif isinstance(artifacts, list): |
| for spec_node in artifacts: |
| cache_specs.append(ArtifactCacheSpec._new_from_config_node(spec_node)) |
| else: |
| provenance = _yaml.node_get_provenance(config_node, key='artifacts') |
| raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA, |
| "%s: 'artifacts' must be a single 'url:' mapping, or a list of mappings" % |
| (str(provenance))) |
| return cache_specs |
| |
| # append_required_artifacts(): |
| # |
| # Append to the list of elements whose artifacts are required for |
| # the current run. Artifacts whose elements are in this list will |
| # be locked by the artifact cache and not touched for the duration |
| # of the current pipeline. |
| # |
| # Args: |
| # elements (iterable): A set of elements to mark as required |
| # |
| def append_required_artifacts(self, elements): |
| # We lock both strong and weak keys - deleting one but not the |
| # other won't save space in most cases anyway, but would be a |
| # user inconvenience. |
| |
| for element in elements: |
| strong_key = element._get_cache_key(strength=_KeyStrength.STRONG) |
| weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) |
| |
| for key in (strong_key, weak_key): |
| if key and key not in self.required_artifacts: |
| self.required_artifacts.add(key) |
| |
| # We also update the usage times of any artifacts |
| # we will be using, which helps preventing a |
| # buildstream process that runs in parallel with |
| # this one from removing artifacts in-use. |
| try: |
| self.update_atime(element, key) |
| except FileNotFoundError: |
| pass |
| |
| # clean(): |
| # |
| # Clean the artifact cache as much as possible. |
| # |
| def clean(self): |
| artifacts = self.list_artifacts() |
| |
| while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold: |
| try: |
| to_remove = artifacts.pop(0) |
| except IndexError: |
| # If too many artifacts are required, and we therefore |
| # can't remove them, we have to abort the build. |
| # |
| # FIXME: Asking the user what to do may be neater |
| default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], |
| 'buildstream.conf') |
| detail = ("There is not enough space to build the given element.\n" |
| "Please increase the cache-quota in {}." |
| .format(self.context.config_origin or default_conf)) |
| |
| if self.calculate_cache_size() > self.context.cache_quota: |
| raise ArtifactError("Cache too full. Aborting.", |
| detail=detail, |
| reason="cache-too-full") |
| else: |
| break |
| |
| key = to_remove.rpartition('/')[2] |
| if key not in self.required_artifacts: |
| self.remove(to_remove) |
| |
| # This should be O(1) if implemented correctly |
| return self.calculate_cache_size() |
| |
| # get_approximate_cache_size() |
| # |
| # A cheap method that aims to serve as an upper limit on the |
| # artifact cache size. |
| # |
| # The cache size reported by this function will normally be larger |
| # than the real cache size, since it is calculated using the |
| # pre-commit artifact size, but for very small artifacts in |
| # certain caches additional overhead could cause this to be |
| # smaller than, but close to, the actual size. |
| # |
| # Nonetheless, in practice this should be safe to use as an upper |
| # limit on the cache size. |
| # |
| # If the cache has built-in constant-time size reporting, please |
| # feel free to override this method with a more accurate |
| # implementation. |
| # |
| # Returns: |
| # (int) An approximation of the artifact cache size. |
| # |
| def get_approximate_cache_size(self): |
| # If we don't currently have an estimate, figure out the real |
| # cache size. |
| if self.estimated_size is None: |
| self.estimated_size = self.calculate_cache_size() |
| |
| return self.estimated_size |
| |
| ################################################ |
| # Abstract methods for subclasses to implement # |
| ################################################ |
| |
| # initialize_remotes(): |
| # |
| # This will contact each remote cache. |
| # |
| # Args: |
| # on_failure (callable): Called if we fail to contact one of the caches. |
| # |
| def initialize_remotes(self, *, on_failure=None): |
| pass |
| |
| # contains(): |
| # |
| # Check whether the artifact for the specified Element is already available |
| # in the local artifact cache. |
| # |
| # Args: |
| # element (Element): The Element to check |
| # key (str): The cache key to use |
| # |
| # Returns: True if the artifact is in the cache, False otherwise |
| # |
| def contains(self, element, key): |
| raise ImplError("Cache '{kind}' does not implement contains()" |
| .format(kind=type(self).__name__)) |
| |
| # list_artifacts(): |
| # |
| # List artifacts in this cache in LRU order. |
| # |
| # Returns: |
| # ([str]) - A list of artifact names as generated by |
| # `ArtifactCache.get_artifact_fullname` in LRU order |
| # |
| def list_artifacts(self): |
| raise ImplError("Cache '{kind}' does not implement list_artifacts()" |
| .format(kind=type(self).__name__)) |
| |
| # remove(): |
| # |
| # Removes the artifact for the specified ref from the local |
| # artifact cache. |
| # |
| # Args: |
| # ref (artifact_name): The name of the artifact to remove (as |
| # generated by |
| # `ArtifactCache.get_artifact_fullname`) |
| # |
| def remove(self, artifact_name): |
| raise ImplError("Cache '{kind}' does not implement remove()" |
| .format(kind=type(self).__name__)) |
| |
| # update_atime(): |
| # |
| # Update the access time of an element. |
| # |
| # Args: |
| # element (Element): The Element to mark |
| # key (str): The cache key to use |
| # |
| def update_atime(self, element, key): |
| raise ImplError("Cache '{kind}' does not implement update_atime()" |
| .format(kind=type(self).__name__)) |
| |
| # extract(): |
| # |
| # Extract cached artifact for the specified Element if it hasn't |
| # already been extracted. |
| # |
| # Assumes artifact has previously been fetched or committed. |
| # |
| # Args: |
| # element (Element): The Element to extract |
| # key (str): The cache key to use |
| # |
| # Raises: |
| # ArtifactError: In cases there was an OSError, or if the artifact |
| # did not exist. |
| # |
| # Returns: path to extracted artifact |
| # |
| def extract(self, element, key): |
| raise ImplError("Cache '{kind}' does not implement extract()" |
| .format(kind=type(self).__name__)) |
| |
| # commit(): |
| # |
| # Commit built artifact to cache. |
| # |
| # Args: |
| # element (Element): The Element commit an artifact for |
| # content (str): The element's content directory |
| # keys (list): The cache keys to use |
| # |
| def commit(self, element, content, keys): |
| raise ImplError("Cache '{kind}' does not implement commit()" |
| .format(kind=type(self).__name__)) |
| |
| # can_diff(): |
| # |
| # Whether this cache implementation can diff (unfortunately |
| # there's no way to tell if an implementation is going to throw |
| # ImplError without abc). |
| # |
| def can_diff(self): |
| return False |
| |
| # diff(): |
| # |
| # Return a list of files that have been added or modified between |
| # the artifacts described by key_a and key_b. |
| # |
| # Args: |
| # element (Element): The element whose artifacts to compare |
| # key_a (str): The first artifact key |
| # key_b (str): The second artifact key |
| # subdir (str): A subdirectory to limit the comparison to |
| # |
| def diff(self, element, key_a, key_b, *, subdir=None): |
| raise ImplError("Cache '{kind}' does not implement diff()" |
| .format(kind=type(self).__name__)) |
| |
| # has_fetch_remotes(): |
| # |
| # Check whether any remote repositories are available for fetching. |
| # |
| # Args: |
| # element (Element): The Element to check |
| # |
| # Returns: True if any remote repositories are configured, False otherwise |
| # |
| def has_fetch_remotes(self, *, element=None): |
| return False |
| |
| # has_push_remotes(): |
| # |
| # Check whether any remote repositories are available for pushing. |
| # |
| # Args: |
| # element (Element): The Element to check |
| # |
| # Returns: True if any remote repository is configured, False otherwise |
| # |
| def has_push_remotes(self, *, element=None): |
| return False |
| |
| # push(): |
| # |
| # Push committed artifact to remote repository. |
| # |
| # Args: |
| # element (Element): The Element whose artifact is to be pushed |
| # keys (list): The cache keys to use |
| # |
| # Returns: |
| # (bool): True if any remote was updated, False if no pushes were required |
| # |
| # Raises: |
| # (ArtifactError): if there was an error |
| # |
| def push(self, element, keys): |
| raise ImplError("Cache '{kind}' does not implement push()" |
| .format(kind=type(self).__name__)) |
| |
| # pull(): |
| # |
| # Pull artifact from one of the configured remote repositories. |
| # |
| # Args: |
| # element (Element): The Element whose artifact is to be fetched |
| # key (str): The cache key to use |
| # progress (callable): The progress callback, if any |
| # |
| # Returns: |
| # (bool): True if pull was successful, False if artifact was not available |
| # |
| def pull(self, element, key, *, progress=None): |
| raise ImplError("Cache '{kind}' does not implement pull()" |
| .format(kind=type(self).__name__)) |
| |
| # link_key(): |
| # |
| # Add a key for an existing artifact. |
| # |
| # Args: |
| # element (Element): The Element whose artifact is to be linked |
| # oldkey (str): An existing cache key for the artifact |
| # newkey (str): A new cache key for the artifact |
| # |
| def link_key(self, element, oldkey, newkey): |
| raise ImplError("Cache '{kind}' does not implement link_key()" |
| .format(kind=type(self).__name__)) |
| |
| # calculate_cache_size() |
| # |
| # Return the real artifact cache size. |
| # |
| # Implementations should also use this to update estimated_size. |
| # |
| # Returns: |
| # |
| # (int) The size of the artifact cache. |
| # |
| def calculate_cache_size(self): |
| raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" |
| .format(kind=type(self).__name__)) |
| |
| ################################################ |
| # Local Private Methods # |
| ################################################ |
| |
| # _message() |
| # |
| # Local message propagator |
| # |
| def _message(self, message_type, message, **kwargs): |
| args = dict(kwargs) |
| self.context.message( |
| Message(None, message_type, message, **args)) |
| |
| # _set_remotes(): |
| # |
| # Set the list of remote caches. If project is None, the global list of |
| # remote caches will be set, which is used by all projects. If a project is |
| # specified, the per-project list of remote caches will be set. |
| # |
| # Args: |
| # remote_specs (list): List of ArtifactCacheSpec instances, in priority order. |
| # project (Project): The Project instance for project-specific remotes |
| def _set_remotes(self, remote_specs, *, project=None): |
| if project is None: |
| # global remotes |
| self.global_remote_specs = remote_specs |
| else: |
| self.project_remote_specs[project] = remote_specs |
| |
| # _initialize_remotes() |
| # |
| # An internal wrapper which calls the abstract method and |
| # reports takes care of messaging |
| # |
| def _initialize_remotes(self): |
| def remote_failed(url, error): |
| self._message(MessageType.WARN, "Failed to fetch remote refs from {}: {}".format(url, error)) |
| |
| with self.context.timed_activity("Initializing remote caches", silent_nested=True): |
| self.initialize_remotes(on_failure=remote_failed) |
| |
| # _add_artifact_size() |
| # |
| # Since we cannot keep track of the cache size between threads, |
| # this method will be called by the main process every time a |
| # process that added something to the cache finishes. |
| # |
| # This will then add the reported size to |
| # ArtifactCache.estimated_size. |
| # |
| def _add_artifact_size(self, artifact_size): |
| if not self.estimated_size: |
| self.estimated_size = self.calculate_cache_size() |
| |
| self.estimated_size += artifact_size |
| |
| # _set_cache_size() |
| # |
| # Similarly to the above method, when we calculate the actual size |
| # in a child thread, we can't update it. We instead pass the value |
| # back to the main thread and update it there. |
| # |
| def _set_cache_size(self, cache_size): |
| self.estimated_size = cache_size |
| |
| |
| # _configured_remote_artifact_cache_specs(): |
| # |
| # Return the list of configured artifact remotes for a given project, in priority |
| # order. This takes into account the user and project configuration. |
| # |
| # Args: |
| # context (Context): The BuildStream context |
| # project (Project): The BuildStream project |
| # |
| # Returns: |
| # A list of ArtifactCacheSpec instances describing the remote artifact caches. |
| # |
| def _configured_remote_artifact_cache_specs(context, project): |
| project_overrides = context.get_overrides(project.name) |
| project_extra_specs = ArtifactCache.specs_from_config_node(project_overrides) |
| |
| return list(utils._deduplicate( |
| project_extra_specs + project.artifact_cache_specs + context.artifact_cache_specs)) |