blob: ba0261f37708c6e23f63ed69ef9957242bc5a37f [file] [log] [blame]
# Copyright (C) 2017-2018 Codethink Limited
# Copyright (C) 2019-2020 Bloomberg Finance LP
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# Authors:
# Tristan Maat <>
import os
from ._assetcache import AssetCache
from ._cas.casremote import BlobNotFound
from ._exceptions import ArtifactError, AssetCacheError, CASError, CASRemoteError
from ._protos.buildstream.v2 import artifact_pb2
from . import utils
# An ArtifactCache manages artifacts.
# Args:
# context (Context): The BuildStream context
class ArtifactCache(AssetCache):
def __init__(self, context):
# create artifact directory
self._basedir = context.artifactdir
os.makedirs(self._basedir, exist_ok=True)
# preflight():
# Preflight check.
def preflight(self):
# contains():
# Check whether the artifact for the specified Element is already available
# in the local artifact cache.
# Args:
# element (Element): The Element to check
# key (str): The cache key to use
# Returns: True if the artifact is in the cache, False otherwise
def contains(self, element, key):
ref = element.get_artifact_name(key)
return os.path.exists(os.path.join(self._basedir, ref))
# list_artifacts():
# List artifacts in this cache in LRU order.
# Args:
# glob (str): An option glob expression to be used to list artifacts satisfying the glob
# Returns:
# ([str]) - A list of artifact names as generated in LRU order
def list_artifacts(self, *, glob=None):
return [ref for _, ref in sorted(list(self.list_refs_mtimes(self._basedir, glob_expr=glob)))]
# remove():
# Removes the artifact for the specified ref from the local
# artifact cache.
# Args:
# ref (artifact_name): The name of the artifact to remove (as
# generated by `Element.get_artifact_name`)
def remove(self, ref):
except AssetCacheError as e:
raise ArtifactError("{}".format(e)) from e
# push():
# Push committed artifact to remote repository.
# Args:
# element (Element): The Element whose artifact is to be pushed
# artifact (Artifact): The artifact being pushed
# Returns:
# (bool): True if any remote was updated, False if no pushes were required
# Raises:
# (ArtifactError): if there was an error
def push(self, element, artifact):
project = element._get_project()
display_key = element._get_display_key()
index_remotes, storage_remotes = self.get_remotes(, True)
artifact_proto = artifact._get_proto()
artifact_digest = self.cas.add_object(buffer=artifact_proto.SerializeToString())
pushed = False
# First push our files to all storage remotes, so that they
# can perform file checks on their end
for remote in storage_remotes:
element.status("Pushing data from artifact {} -> {}".format(display_key.brief, remote))
if self._push_artifact_blobs(artifact, artifact_digest, remote):"Pushed data from artifact {} -> {}".format(display_key.brief, remote))
"Remote ({}) already has all data of artifact {} cached".format(remote, display_key.brief)
for remote in index_remotes:
element.status("Pushing artifact {} -> {}".format(display_key.brief, remote))
if self._push_artifact_proto(element, artifact, artifact_digest, remote):"Pushed artifact {} -> {}".format(display_key.brief, remote))
pushed = True
else:"Remote ({}) already has artifact {} cached".format(remote, display_key.brief))
return pushed
# pull():
# Pull artifact from one of the configured remote repositories.
# Args:
# element (Element): The Element whose artifact is to be fetched
# key (str): The cache key to use
# pull_buildtrees (bool): Whether to pull buildtrees or not
# Returns:
# (bool): True if pull was successful, False if artifact was not available
def pull(self, element, key, *, pull_buildtrees=False):
artifact_digest = None
display_key = key[: self.context.log_key_length]
project = element._get_project()
artifact_name = element.get_artifact_name(key=key)
uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name)
index_remotes, storage_remotes = self.get_remotes(, False)
errors = []
# Start by pulling our artifact proto, so that we know which
# blobs to pull
for remote in index_remotes:
element.status("Pulling artifact {} <- {}".format(display_key, remote))
response = remote.fetch_blob([uri])
if response:
artifact_digest = response.blob_digest
break"Remote ({}) does not have artifact {} cached".format(remote, display_key))
except AssetCacheError as e:
element.warn("Could not pull from remote {}: {}".format(remote, e))
if errors and not artifact_digest:
raise ArtifactError(
"Failed to pull artifact {}".format(display_key),
detail="\n".join(str(e) for e in errors),
# If we don't have an artifact, we can't exactly pull our
# artifact
if not artifact_digest:
return False
errors = []
# If we do, we can pull it!
for remote in storage_remotes:
element.status("Pulling data for artifact {} <- {}".format(display_key, remote))
if self._pull_artifact_storage(element, key, artifact_digest, remote, pull_buildtrees=pull_buildtrees):"Pulled artifact {} <- {}".format(display_key, remote))
return True"Remote ({}) does not have artifact {} cached".format(remote, display_key))
except BlobNotFound as e:
# Not all blobs are available on this remote"Remote cas ({}) does not have blob {} cached".format(remote, e.blob))
except CASError as e:
element.warn("Could not pull from remote {}: {}".format(remote, e))
if errors:
raise ArtifactError(
"Failed to pull artifact {}".format(display_key),
detail="\n".join(str(e) for e in errors),
return False
# pull_tree():
# Pull a single Tree rather than an artifact.
# Does not update local refs.
# Args:
# project (Project): The current project
# digest (Digest): The digest of the tree
def pull_tree(self, project, digest):
_, storage_remotes = self.get_remotes(, False)
for remote in storage_remotes:
digest = self.cas.pull_tree(remote, digest)
if digest:
# no need to pull from additional remotes
return digest
return None
# push_message():
# Push the given protobuf message to all remotes.
# Args:
# project (Project): The current project
# message (Message): A protobuf message to push.
# Raises:
# (ArtifactError): if there was an error
def push_message(self, project, message):
_, push_remotes = self.get_remotes(, True)
if not push_remotes:
raise ArtifactError(
"push_message was called, but no remote artifact " + "servers are configured as push remotes."
for remote in push_remotes:
message_digest = remote.push_message(message)
return message_digest
# link_key():
# Add a key for an existing artifact.
# Args:
# element (Element): The Element whose artifact is to be linked
# oldkey (str): An existing cache key for the artifact
# newkey (str): A new cache key for the artifact
def link_key(self, element, oldkey, newkey):
oldref = element.get_artifact_name(oldkey)
newref = element.get_artifact_name(newkey)
if oldref == newref:
# The two refs are identical, nothing to do
utils.safe_link(os.path.join(self._basedir, oldref), os.path.join(self._basedir, newref))
# fetch_missing_blobs():
# Fetch missing blobs from configured remote repositories.
# Args:
# project (Project): The current project
# missing_blobs (list): The Digests of the blobs to fetch
def fetch_missing_blobs(self, project, missing_blobs):
index_remotes, _ = self.get_remotes(, False)
for remote in index_remotes:
if not missing_blobs:
# fetch_blobs() will return the blobs that are still missing
missing_blobs = self.cas.fetch_blobs(remote, missing_blobs, allow_partial=True)
if missing_blobs:
raise ArtifactError("Blobs not found on configured artifact servers")
# find_missing_blobs():
# Find missing blobs from configured push remote repositories.
# Args:
# project (Project): The current project
# missing_blobs (list): The Digests of the blobs to check
# Returns:
# (list): The Digests of the blobs missing on at least one push remote
def find_missing_blobs(self, project, missing_blobs):
if not missing_blobs:
return []
_, push_remotes = self.get_remotes(, True)
remote_missing_blobs_list = []
for remote in push_remotes:
remote_missing_blobs = self.cas.missing_blobs(missing_blobs, remote=remote)
for blob in remote_missing_blobs:
if blob not in remote_missing_blobs_list:
return remote_missing_blobs_list
# check_remotes_for_element()
# Check if the element is available in any of the remotes
# Args:
# element (Element): The element to check
# Returns:
# (bool): True if the element is available remotely
def check_remotes_for_element(self, element):
project = element._get_project()
index_remotes, _ = self.get_remotes(, False)
# If there are no remotes
if not index_remotes:
return False
ref = element.get_artifact_name()
for remote in index_remotes:
if self._query_remote(ref, remote):
return True
return False
# Local Private Methods #
# _push_artifact_blobs()
# Push the blobs that make up an artifact to the remote server.
# Args:
# artifact (Artifact): The artifact whose blobs to push.
# remote (CASRemote): The remote to push the blobs to.
# Returns:
# (bool) - True if we uploaded anything, False otherwise.
# Raises:
# ArtifactError: If we fail to push blobs (*unless* they're
# already there or we run out of space on the server).
def _push_artifact_blobs(self, artifact, artifact_digest, remote):
artifact_proto = artifact._get_proto()
if str(artifact_proto.files):
self.cas._send_directory(remote, artifact_proto.files)
if str(artifact_proto.buildtree):
self.cas._send_directory(remote, artifact_proto.buildtree)
except FileNotFoundError:
digests = [artifact_digest, artifact_proto.low_diversity_meta, artifact_proto.high_diversity_meta]
if str(artifact_proto.public_data):
for log_file in artifact_proto.logs:
self.cas.send_blobs(remote, digests)
except CASRemoteError as cas_error:
if cas_error.reason != "cache-too-full":
raise ArtifactError("Failed to push artifact blobs: {}".format(cas_error), temporary=True)
return False
return True
# _push_artifact_proto()
# Pushes the artifact proto to remote.
# Args:
# element (Element): The element
# artifact (Artifact): The related artifact being pushed
# remote (AssetRemote): Remote to push to
# Returns:
# (bool): Whether we pushed the artifact.
# Raises:
# ArtifactError: If the push fails for any reason except the
# artifact already existing.
def _push_artifact_proto(self, element, artifact, artifact_digest, remote):
artifact_proto = artifact._get_proto()
keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
artifact_names = [element.get_artifact_name(key=key) for key in keys]
uris = [REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name) for artifact_name in artifact_names]
response = remote.fetch_blob(uris)
# Skip push if artifact is already on the server
if response and response.blob_digest == artifact_digest:
return False
except AssetCacheError as e:
raise ArtifactError("{}".format(e), temporary=True) from e
referenced_directories = []
if artifact_proto.files:
if artifact_proto.buildtree:
if artifact_proto.sources:
referenced_blobs = [artifact_proto.low_diversity_meta, artifact_proto.high_diversity_meta] + [
log_file.digest for log_file in artifact_proto.logs
except AssetCacheError as e:
raise ArtifactError("{}".format(e), temporary=True) from e
return True
# _pull_artifact_storage():
# Pull artifact blobs from the given remote.
# Args:
# element (Element): element to pull
# key (str): The specific key for the artifact to pull
# remote (CASRemote): remote to pull from
# pull_buildtree (bool): whether to pull buildtrees or not
# Returns:
# (bool): True if we pulled any blobs.
# Raises:
# ArtifactError: If the pull failed for any reason except the
# blobs not existing on the server.
def _pull_artifact_storage(self, element, key, artifact_digest, remote, pull_buildtrees=False):
artifact_name = element.get_artifact_name(key=key)
# Fetch and parse artifact proto
self.cas.fetch_blobs(remote, [artifact_digest])
artifact = artifact_pb2.Artifact()
with, "rb") as f:
# Write the artifact proto to cache
artifact_path = os.path.join(self._basedir, artifact_name)
os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
with utils.save_file_atomic(artifact_path, mode="wb") as f:
if str(artifact.files):
self.cas._fetch_directory(remote, artifact.files)
if pull_buildtrees and str(artifact.buildtree):
self.cas._fetch_directory(remote, artifact.buildtree)
digests = [artifact.low_diversity_meta, artifact.high_diversity_meta]
if str(artifact.public_data):
for log_digest in artifact.logs:
self.cas.fetch_blobs(remote, digests)
except BlobNotFound:
return False
except CASRemoteError as e:
raise ArtifactError("{}".format(e), temporary=True) from e
return True
# _query_remote()
# Args:
# ref (str): The artifact ref
# remote (AssetRemote): The remote we want to check
# Returns:
# (bool): True if the ref exists in the remote, False otherwise.
def _query_remote(self, ref, remote):
response = remote.fetch_blob([uri])
return bool(response)
except AssetCacheError as e:
raise ArtifactError("{}".format(e), temporary=True) from e