| # |
| # Copyright (C) 2019-2020 Bloomberg Finance LP |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this library. If not, see <http://www.gnu.org/licenses/>. |
| # |
| import os |
| import grpc |
| |
| from ._cas.casremote import BlobNotFound |
| from ._assetcache import AssetCache |
| from ._exceptions import AssetCacheError, CASError, CASRemoteError, SourceCacheError |
| from . import utils |
| from ._protos.buildstream.v2 import source_pb2 |
| |
| REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}" |
| |
| |
| # Class that keeps config of remotes and deals with caching of sources. |
| # |
| # Args: |
| # context (Context): The Buildstream context |
| # |
| class ElementSourcesCache(AssetCache): |
| def __init__(self, context): |
| super().__init__(context) |
| |
| self._basedir = os.path.join(context.cachedir, "elementsources") |
| os.makedirs(self._basedir, exist_ok=True) |
| |
| # load_proto(): |
| # |
| # Load source proto from local cache. |
| # |
| # Args: |
| # sources (ElementSources): The sources whose proto we want to load |
| # |
| def load_proto(self, sources): |
| ref = sources.get_cache_key() |
| path = self._source_path(ref) |
| |
| if not os.path.exists(path): |
| return None |
| |
| source_proto = source_pb2.Source() |
| with open(path, "r+b") as f: |
| source_proto.ParseFromString(f.read()) |
| return source_proto |
| |
| def store_proto(self, sources, proto): |
| ref = sources.get_cache_key() |
| path = self._source_path(ref) |
| |
| with utils.save_file_atomic(path, "w+b") as f: |
| f.write(proto.SerializeToString()) |
| |
| # pull(): |
| # |
| # Attempts to pull sources from configured remote source caches. |
| # |
| # Args: |
| # sources (ElementSources): The sources we want to fetch |
| # |
| # Returns: |
| # (bool): True if pull successful, False if not |
| # |
| def pull(self, sources, plugin): |
| project = sources.get_project() |
| |
| ref = sources.get_cache_key() |
| display_key = sources.get_brief_display_key() |
| |
| uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(ref) |
| |
| index_remotes, storage_remotes = self.get_remotes(project.name, False) |
| |
| source_digest = None |
| errors = [] |
| # Start by pulling our source proto, so that we know which |
| # blobs to pull |
| for remote in index_remotes: |
| remote.init() |
| try: |
| plugin.status("Pulling source {} <- {}".format(display_key, remote)) |
| response = remote.fetch_blob([uri]) |
| if response: |
| source_digest = response.blob_digest |
| break |
| |
| plugin.info("Remote ({}) does not have source {} cached".format(remote, display_key)) |
| except AssetCacheError as e: |
| plugin.warn("Could not pull from remote {}: {}".format(remote, e)) |
| errors.append(e) |
| |
| if errors and not source_digest: |
| raise SourceCacheError( |
| "Failed to pull source {}".format(display_key), detail="\n".join(str(e) for e in errors) |
| ) |
| |
| # If we don't have a source proto, we can't pull source files |
| if not source_digest: |
| return False |
| |
| errors = [] |
| for remote in storage_remotes: |
| remote.init() |
| try: |
| plugin.status("Pulling data for source {} <- {}".format(display_key, remote)) |
| |
| if self._pull_source_storage(ref, source_digest, remote): |
| plugin.info("Pulled source {} <- {}".format(display_key, remote)) |
| return True |
| |
| plugin.info("Remote ({}) does not have source {} cached".format(remote, display_key)) |
| except BlobNotFound as e: |
| # Not all blobs are available on this remote |
| plugin.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob)) |
| continue |
| except CASError as e: |
| plugin.warn("Could not pull from remote {}: {}".format(remote, e)) |
| errors.append(e) |
| |
| if errors: |
| raise SourceCacheError( |
| "Failed to pull source {}".format(display_key), detail="\n".join(str(e) for e in errors) |
| ) |
| |
| return False |
| |
| # push(): |
| # |
| # Push sources to remote repository. |
| # |
| # Args: |
| # sources (ElementSources): The sources to be pushed |
| # |
| # Returns: |
| # (bool): True if any remote was updated, False if no pushes were required |
| # |
| # Raises: |
| # (SourceCacheError): if there was an error |
| # |
| def push(self, sources, plugin): |
| project = sources.get_project() |
| |
| ref = sources.get_cache_key() |
| display_key = sources.get_brief_display_key() |
| |
| uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(ref) |
| |
| index_remotes, storage_remotes = self.get_remotes(project.name, True) |
| |
| source_proto = self.load_proto(sources) |
| source_digest = self.cas.add_object(buffer=source_proto.SerializeToString()) |
| |
| pushed = False |
| |
| # First push our files to all storage remotes, so that they |
| # can perform file checks on their end |
| for remote in storage_remotes: |
| remote.init() |
| plugin.status("Pushing data from source {} -> {}".format(display_key, remote)) |
| |
| if self._push_source_blobs(source_proto, source_digest, remote): |
| plugin.info("Pushed data from source {} -> {}".format(display_key, remote)) |
| else: |
| plugin.info("Remote ({}) already has all data of source {} cached".format(remote, display_key())) |
| |
| for remote in index_remotes: |
| remote.init() |
| plugin.status("Pushing source {} -> {}".format(display_key, remote)) |
| |
| if self._push_source_proto(uri, source_proto, source_digest, remote): |
| plugin.info("Pushed source {} -> {}".format(display_key, remote)) |
| pushed = True |
| else: |
| plugin.info("Remote ({}) already has source {} cached".format(remote, display_key)) |
| |
| return pushed |
| |
| def _get_source(self, ref): |
| path = self._source_path(ref) |
| source_proto = source_pb2.Source() |
| try: |
| with open(path, "r+b") as f: |
| source_proto.ParseFromString(f.read()) |
| return source_proto |
| except FileNotFoundError as e: |
| raise SourceCacheError("Attempted to access unavailable source: {}".format(e)) from e |
| |
| def _source_path(self, ref): |
| return os.path.join(self._basedir, ref) |
| |
| # _push_source_blobs() |
| # |
| # Push the blobs that make up an source to the remote server. |
| # |
| # Args: |
| # source_proto: The source proto whose blobs to push. |
| # source_digest: The digest of the source proto. |
| # remote (CASRemote): The remote to push the blobs to. |
| # |
| # Returns: |
| # (bool) - True if we uploaded anything, False otherwise. |
| # |
| # Raises: |
| # SourceCacheError: If we fail to push blobs (*unless* they're |
| # already there or we run out of space on the server). |
| # |
| def _push_source_blobs(self, source_proto, source_digest, remote): |
| try: |
| # Push source files |
| self.cas._send_directory(remote, source_proto.files) |
| # Push source proto |
| self.cas.send_blobs(remote, [source_digest]) |
| |
| except CASRemoteError as cas_error: |
| if cas_error.reason != "cache-too-full": |
| raise SourceCacheError("Failed to push source blobs: {}".format(cas_error)) |
| return False |
| except grpc.RpcError as e: |
| if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: |
| raise SourceCacheError( |
| "Failed to push source blobs with status {}: {}".format(e.code().name, e.details()) |
| ) |
| return False |
| |
| return True |
| |
| # _push_source_proto() |
| # |
| # Pushes the source proto to remote. |
| # |
| # Args: |
| # source_proto: The source proto. |
| # source_digest: The digest of the source proto. |
| # remote (AssetRemote): Remote to push to |
| # |
| # Returns: |
| # (bool): Whether we pushed the source. |
| # |
| # Raises: |
| # SourceCacheError: If the push fails for any reason except the |
| # source already existing. |
| # |
| def _push_source_proto(self, uri, source_proto, source_digest, remote): |
| try: |
| response = remote.fetch_blob([uri]) |
| # Skip push if source is already on the server |
| if response and response.blob_digest == source_digest: |
| return False |
| except grpc.RpcError as e: |
| if e.code() != grpc.StatusCode.NOT_FOUND: |
| raise SourceCacheError( |
| "Error checking source cache with status {}: {}".format(e.code().name, e.details()) |
| ) |
| |
| referenced_directories = [source_proto.files] |
| |
| try: |
| remote.push_blob( |
| [uri], source_digest, references_directories=referenced_directories, |
| ) |
| except grpc.RpcError as e: |
| raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details())) |
| |
| return True |
| |
| # _pull_source_storage(): |
| # |
| # Pull source blobs from the given remote. |
| # |
| # Args: |
| # key (str): The specific key for the source to pull |
| # remote (CASRemote): remote to pull from |
| # |
| # Returns: |
| # (bool): True if we pulled any blobs. |
| # |
| # Raises: |
| # SourceCacheError: If the pull failed for any reason except the |
| # blobs not existing on the server. |
| # |
| def _pull_source_storage(self, key, source_digest, remote): |
| try: |
| # Fetch and parse source proto |
| self.cas.fetch_blobs(remote, [source_digest]) |
| source = source_pb2.Source() |
| with self.cas.open(source_digest, "rb") as f: |
| source.ParseFromString(f.read()) |
| |
| # Write the source proto to cache |
| source_path = os.path.join(self._basedir, key) |
| with utils.save_file_atomic(source_path, mode="wb") as f: |
| f.write(source.SerializeToString()) |
| |
| self.cas._fetch_directory(remote, source.files) |
| except grpc.RpcError as e: |
| if e.code() != grpc.StatusCode.NOT_FOUND: |
| raise SourceCacheError("Failed to pull source with status {}: {}".format(e.code().name, e.details())) |
| return False |
| |
| return True |
| |
| def _push_source(self, source_ref, remote): |
| uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref) |
| |
| try: |
| remote.init() |
| source_proto = self._get_source(source_ref) |
| remote.push_directory([uri], source_proto.files) |
| return True |
| |
| except grpc.RpcError as e: |
| if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: |
| raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details())) |
| return False |