blob: ad8013d1854cece7cd93fe4fae2d4b7bf471ed9d [file] [log] [blame]
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import hashlib
import itertools
import os
import stat
import errno
import uuid
import contextlib
import grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .._protos.buildstream.v2 import buildstream_pb2
from .. import utils
from .._exceptions import CASCacheError, LoadError, LoadErrorReason
from .._message import Message, MessageType
from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
_BUFFER_SIZE = 65536
CACHE_SIZE_FILE = "cache_size"
# CASCacheUsage
#
# A simple object to report the current CAS cache usage details.
#
# Note that this uses the user configured cache quota
# rather than the internal quota with protective headroom
# removed, to provide a more sensible value to display to
# the user.
#
# Args:
# cas (CASQuota): The CAS cache to get the status of
#
class CASCacheUsage():
def __init__(self, casquota):
self.quota_config = casquota._config_cache_quota # Configured quota
self.quota_size = casquota._cache_quota_original # Resolved cache quota in bytes
self.used_size = casquota.get_cache_size() # Size used by artifacts in bytes
self.used_percent = 0 # Percentage of the quota used
if self.quota_size is not None:
self.used_percent = int(self.used_size * 100 / self.quota_size)
# Formattable into a human readable string
#
def __str__(self):
return "{} / {} ({}%)" \
.format(utils._pretty_size(self.used_size, dec_places=1),
self.quota_config,
self.used_percent)
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
# path (str): The root directory for the CAS repository
# cache_quota (int): User configured cache quota
#
class CASCache():
def __init__(self, path):
self.casdir = os.path.join(path, 'cas')
self.tmpdir = os.path.join(path, 'tmp')
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
self.__reachable_directory_callbacks = []
self.__reachable_digest_callbacks = []
# preflight():
#
# Preflight check.
#
def preflight(self):
headdir = os.path.join(self.casdir, 'refs', 'heads')
objdir = os.path.join(self.casdir, 'objects')
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir))
# contains():
#
# Check whether the specified ref is already available in the local CAS cache.
#
# Args:
# ref (str): The ref to check
#
# Returns: True if the ref is in the cache, False otherwise
#
def contains(self, ref):
refpath = self._refpath(ref)
# This assumes that the repository doesn't have any dangling pointers
return os.path.exists(refpath)
# contains_directory():
#
# Check whether the specified directory and subdirecotires are in the cache,
# i.e non dangling.
#
# Args:
# digest (Digest): The directory digest to check
# with_files (bool): Whether to check files as well
#
# Returns: True if the directory is available in the local cache
#
def contains_directory(self, digest, *, with_files):
try:
directory = remote_execution_pb2.Directory()
with open(self.objpath(digest), 'rb') as f:
directory.ParseFromString(f.read())
# Optionally check presence of files
if with_files:
for filenode in directory.files:
if not os.path.exists(self.objpath(filenode.digest)):
return False
# Check subdirectories
for dirnode in directory.directories:
if not self.contains_directory(dirnode.digest, with_files=with_files):
return False
return True
except FileNotFoundError:
return False
# checkout():
#
# Checkout the specified directory digest.
#
# Args:
# dest (str): The destination path
# tree (Digest): The directory digest to extract
# can_link (bool): Whether we can create hard links in the destination
#
def checkout(self, dest, tree, *, can_link=False):
os.makedirs(dest, exist_ok=True)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
# regular file, create hardlink
fullpath = os.path.join(dest, filenode.name)
if can_link:
utils.safe_link(self.objpath(filenode.digest), fullpath)
else:
utils.safe_copy(self.objpath(filenode.digest), fullpath)
if filenode.is_executable:
os.chmod(fullpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
for dirnode in directory.directories:
fullpath = os.path.join(dest, dirnode.name)
self.checkout(fullpath, dirnode.digest, can_link=can_link)
for symlinknode in directory.symlinks:
# symlink
fullpath = os.path.join(dest, symlinknode.name)
os.symlink(symlinknode.target, fullpath)
# commit():
#
# Commit directory to cache.
#
# Args:
# refs (list): The refs to set
# path (str): The directory to import
#
def commit(self, refs, path):
tree = self._commit_directory(path)
for ref in refs:
self.set_ref(ref, tree)
# diff():
#
# Return a list of files that have been added or modified between
# the refs described by ref_a and ref_b.
#
# Args:
# ref_a (str): The first ref
# ref_b (str): The second ref
# subdir (str): A subdirectory to limit the comparison to
#
def diff(self, ref_a, ref_b):
tree_a = self.resolve_ref(ref_a)
tree_b = self.resolve_ref(ref_b)
added = []
removed = []
modified = []
self.diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified)
return modified, removed, added
# pull():
#
# Pull a ref from a remote repository.
#
# Args:
# ref (str): The ref to pull
# remote (CASRemote): The remote repository to pull from
#
# Returns:
# (bool): True if pull was successful, False if ref was not available
#
def pull(self, ref, remote):
try:
remote.init()
request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
request.key = ref
response = remote.ref_storage.GetReference(request)
tree = response.digest
# Fetch Directory objects
self._fetch_directory(remote, tree)
# Fetch files, excluded_subdirs determined in pullqueue
required_blobs = self.required_blobs_for_directory(tree)
missing_blobs = self.local_missing_blobs(required_blobs)
if missing_blobs:
self.fetch_blobs(remote, missing_blobs)
self.set_ref(ref, tree)
return True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise CASCacheError("Failed to pull ref {}: {}".format(ref, e)) from e
else:
return False
except BlobNotFound as e:
return False
# pull_tree():
#
# Pull a single Tree rather than a ref.
# Does not update local refs.
#
# Args:
# remote (CASRemote): The remote to pull from
# digest (Digest): The digest of the tree
#
def pull_tree(self, remote, digest):
try:
remote.init()
digest = self._fetch_tree(remote, digest)
return digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise
return None
# link_ref():
#
# Add an alias for an existing ref.
#
# Args:
# oldref (str): An existing ref
# newref (str): A new ref for the same directory
#
def link_ref(self, oldref, newref):
tree = self.resolve_ref(oldref)
self.set_ref(newref, tree)
# push():
#
# Push committed refs to remote repository.
#
# Args:
# refs (list): The refs to push
# remote (CASRemote): The remote to push to
#
# Returns:
# (bool): True if any remote was updated, False if no pushes were required
#
# Raises:
# (CASCacheError): if there was an error
#
def push(self, refs, remote):
skipped_remote = True
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
# there is no need to push the ref
try:
request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
request.key = ref
response = remote.ref_storage.GetReference(request)
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise
self._send_directory(remote, tree)
request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
request.keys.append(ref)
request.digest.CopyFrom(tree)
remote.ref_storage.UpdateReference(request)
skipped_remote = False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
return not skipped_remote
# objpath():
#
# Return the path of an object based on its digest.
#
# Args:
# digest (Digest): The digest of the object
#
# Returns:
# (str): The path of the object
#
def objpath(self, digest):
return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:])
# add_object():
#
# Hash and write object to CAS.
#
# Args:
# digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
# link_directly (bool): Whether file given by path can be linked
#
# Returns:
# (Digest): The digest of the added object
#
# Either `path` or `buffer` must be passed, but not both.
#
def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
if digest is None:
digest = remote_execution_pb2.Digest()
try:
h = hashlib.sha256()
# Always write out new file to avoid corruption if input file is modified
with contextlib.ExitStack() as stack:
if path is not None and link_directly:
tmp = stack.enter_context(open(path, 'rb'))
for chunk in iter(lambda: tmp.read(_BUFFER_SIZE), b""):
h.update(chunk)
else:
tmp = stack.enter_context(self._temporary_object())
if path:
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(_BUFFER_SIZE), b""):
h.update(chunk)
tmp.write(chunk)
else:
h.update(buffer)
tmp.write(buffer)
tmp.flush()
digest.hash = h.hexdigest()
digest.size_bytes = os.fstat(tmp.fileno()).st_size
# Place file at final location
objpath = self.objpath(digest)
os.makedirs(os.path.dirname(objpath), exist_ok=True)
os.link(tmp.name, objpath)
except FileExistsError as e:
# We can ignore the failed link() if the object is already in the repo.
pass
except OSError as e:
raise CASCacheError("Failed to hash object: {}".format(e)) from e
return digest
# set_ref():
#
# Create or replace a ref.
#
# Args:
# ref (str): The name of the ref
#
def set_ref(self, ref, tree):
refpath = self._refpath(ref)
os.makedirs(os.path.dirname(refpath), exist_ok=True)
with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
f.write(tree.SerializeToString())
# resolve_ref():
#
# Resolve a ref to a digest.
#
# Args:
# ref (str): The name of the ref
# update_mtime (bool): Whether to update the mtime of the ref
#
# Returns:
# (Digest): The digest stored in the ref
#
def resolve_ref(self, ref, *, update_mtime=False):
refpath = self._refpath(ref)
try:
with open(refpath, 'rb') as f:
if update_mtime:
os.utime(refpath)
digest = remote_execution_pb2.Digest()
digest.ParseFromString(f.read())
return digest
except FileNotFoundError as e:
raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
# update_mtime()
#
# Update the mtime of a ref.
#
# Args:
# ref (str): The ref to update
#
def update_mtime(self, ref):
try:
os.utime(self._refpath(ref))
except FileNotFoundError as e:
raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
# list_objects():
#
# List cached objects in Least Recently Modified (LRM) order.
#
# Returns:
# (list) - A list of objects and timestamps in LRM order
#
def list_objects(self):
objs = []
mtimes = []
for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
for filename in files:
obj_path = os.path.join(root, filename)
try:
mtimes.append(os.path.getmtime(obj_path))
except FileNotFoundError:
pass
else:
objs.append(obj_path)
# NOTE: Sorted will sort from earliest to latest, thus the
# first element of this list will be the file modified earliest.
return sorted(zip(mtimes, objs))
def clean_up_refs_until(self, time):
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
for root, _, files in os.walk(ref_heads):
for filename in files:
ref_path = os.path.join(root, filename)
# Obtain the mtime (the time a file was last modified)
if os.path.getmtime(ref_path) < time:
os.unlink(ref_path)
# remove():
#
# Removes the given symbolic ref from the repo.
#
# Args:
# ref (str): A symbolic ref
# basedir (str): Path of base directory the ref is in, defaults to
# CAS refs heads
# defer_prune (bool): Whether to defer pruning to the caller. NOTE:
# The space won't be freed until you manually
# call prune.
#
# Returns:
# (int|None) The amount of space pruned from the repository in
# Bytes, or None if defer_prune is True
#
def remove(self, ref, *, basedir=None, defer_prune=False):
if basedir is None:
basedir = os.path.join(self.casdir, 'refs', 'heads')
# Remove cache ref
self._remove_ref(ref, basedir)
if not defer_prune:
pruned = self.prune()
return pruned
return None
# adds callback of iterator over reachable directory digests
def add_reachable_directories_callback(self, callback):
self.__reachable_directory_callbacks.append(callback)
# adds callbacks of iterator over reachable file digests
def add_reachable_digests_callback(self, callback):
self.__reachable_digest_callbacks.append(callback)
# prune():
#
# Prune unreachable objects from the repo.
#
def prune(self):
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
pruned = 0
reachable = set()
# Check which objects are reachable
for root, _, files in os.walk(ref_heads):
for filename in files:
ref_path = os.path.join(root, filename)
ref = os.path.relpath(ref_path, ref_heads)
tree = self.resolve_ref(ref)
self._reachable_refs_dir(reachable, tree)
# check callback directory digests that are reachable
for digest_callback in self.__reachable_directory_callbacks:
for digest in digest_callback():
self._reachable_refs_dir(reachable, digest)
# check callback file digests that are reachable
for digest_callback in self.__reachable_digest_callbacks:
for digest in digest_callback():
reachable.add(digest.hash)
# Prune unreachable objects
for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
for filename in files:
objhash = os.path.basename(root) + filename
if objhash not in reachable:
obj_path = os.path.join(root, filename)
pruned += os.stat(obj_path).st_size
os.unlink(obj_path)
return pruned
def update_tree_mtime(self, tree):
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
# remote_missing_blobs_for_directory():
#
# Determine which blobs of a directory tree are missing on the remote.
#
# Args:
# digest (Digest): The directory digest
#
# Returns: List of missing Digest objects
#
def remote_missing_blobs_for_directory(self, remote, digest):
required_blobs = self.required_blobs_for_directory(digest)
return self.remote_missing_blobs(remote, required_blobs)
# remote_missing_blobs():
#
# Determine which blobs are missing on the remote.
#
# Args:
# blobs (Digest): The directory digest
#
# Returns: List of missing Digest objects
#
def remote_missing_blobs(self, remote, blobs):
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.CopyFrom(required_digest)
response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.CopyFrom(missing_digest)
missing_blobs[d.hash] = d
return missing_blobs.values()
# local_missing_blobs():
#
# Check local cache for missing blobs.
#
# Args:
# digests (list): The Digests of blobs to check
#
# Returns: Missing Digest objects
#
def local_missing_blobs(self, digests):
missing_blobs = []
for digest in digests:
objpath = self.objpath(digest)
if not os.path.exists(objpath):
missing_blobs.append(digest)
return missing_blobs
# required_blobs_for_directory():
#
# Generator that returns the Digests of all blobs in the tree specified by
# the Digest of the toplevel Directory object.
#
def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None):
if not excluded_subdirs:
excluded_subdirs = []
# parse directory, and recursively add blobs
yield directory_digest
directory = remote_execution_pb2.Directory()
with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
yield filenode.digest
for dirnode in directory.directories:
if dirnode.name not in excluded_subdirs:
yield from self.required_blobs_for_directory(dirnode.digest)
def diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
dir_a = remote_execution_pb2.Directory()
dir_b = remote_execution_pb2.Directory()
if tree_a:
with open(self.objpath(tree_a), 'rb') as f:
dir_a.ParseFromString(f.read())
if tree_b:
with open(self.objpath(tree_b), 'rb') as f:
dir_b.ParseFromString(f.read())
a = 0
b = 0
while a < len(dir_a.files) or b < len(dir_b.files):
if b < len(dir_b.files) and (a >= len(dir_a.files) or
dir_a.files[a].name > dir_b.files[b].name):
added.append(os.path.join(path, dir_b.files[b].name))
b += 1
elif a < len(dir_a.files) and (b >= len(dir_b.files) or
dir_b.files[b].name > dir_a.files[a].name):
removed.append(os.path.join(path, dir_a.files[a].name))
a += 1
else:
# File exists in both directories
if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash:
modified.append(os.path.join(path, dir_a.files[a].name))
a += 1
b += 1
a = 0
b = 0
while a < len(dir_a.directories) or b < len(dir_b.directories):
if b < len(dir_b.directories) and (a >= len(dir_a.directories) or
dir_a.directories[a].name > dir_b.directories[b].name):
self.diff_trees(None, dir_b.directories[b].digest,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_b.directories[b].name))
b += 1
elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or
dir_b.directories[b].name > dir_a.directories[a].name):
self.diff_trees(dir_a.directories[a].digest, None,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_a.directories[a].name))
a += 1
else:
# Subdirectory exists in both directories
if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash:
self.diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_a.directories[a].name))
a += 1
b += 1
################################################
# Local Private Methods #
################################################
def _refpath(self, ref):
return os.path.join(self.casdir, 'refs', 'heads', ref)
# _remove_ref()
#
# Removes a ref.
#
# This also takes care of pruning away directories which can
# be removed after having removed the given ref.
#
# Args:
# ref (str): The ref to remove
# basedir (str): Path of base directory the ref is in
#
# Raises:
# (CASCacheError): If the ref didnt exist, or a system error
# occurred while removing it
#
def _remove_ref(self, ref, basedir):
# Remove the ref itself
refpath = os.path.join(basedir, ref)
try:
os.unlink(refpath)
except FileNotFoundError as e:
raise CASCacheError("Could not find ref '{}'".format(ref)) from e
# Now remove any leading directories
components = list(os.path.split(ref))
while components:
components.pop()
refdir = os.path.join(basedir, *components)
# Break out once we reach the base
if refdir == basedir:
break
try:
os.rmdir(refdir)
except FileNotFoundError:
# The parent directory did not exist, but it's
# parent directory might still be ready to prune
pass
except OSError as e:
if e.errno == errno.ENOTEMPTY:
# The parent directory was not empty, so we
# cannot prune directories beyond this point
break
# Something went wrong here
raise CASCacheError("System error while removing ref '{}': {}".format(ref, e)) from e
# _commit_directory():
#
# Adds local directory to content addressable store.
#
# Adds files, symbolic links and recursively other directories in
# a local directory to the content addressable store.
#
# Args:
# path (str): Path to the directory to add.
# dir_digest (Digest): An optional Digest object to use.
#
# Returns:
# (Digest): Digest object for the directory added.
#
def _commit_directory(self, path, *, dir_digest=None):
directory = remote_execution_pb2.Directory()
for name in sorted(os.listdir(path)):
full_path = os.path.join(path, name)
mode = os.lstat(full_path).st_mode
if stat.S_ISDIR(mode):
dirnode = directory.directories.add()
dirnode.name = name
self._commit_directory(full_path, dir_digest=dirnode.digest)
elif stat.S_ISREG(mode):
filenode = directory.files.add()
filenode.name = name
self.add_object(path=full_path, digest=filenode.digest)
filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
elif stat.S_ISLNK(mode):
symlinknode = directory.symlinks.add()
symlinknode.name = name
symlinknode.target = os.readlink(full_path)
elif stat.S_ISSOCK(mode):
# The process serving the socket can't be cached anyway
pass
else:
raise CASCacheError("Unsupported file type for {}".format(full_path))
return self.add_object(digest=dir_digest,
buffer=directory.SerializeToString())
def _get_subdir(self, tree, subdir):
head, name = os.path.split(subdir)
if head:
tree = self._get_subdir(tree, head)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
for dirnode in directory.directories:
if dirnode.name == name:
return dirnode.digest
raise CASCacheError("Subdirectory {} not found".format(name))
def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False):
if tree.hash in reachable:
return
try:
if update_mtime:
os.utime(self.objpath(tree))
reachable.add(tree.hash)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
except FileNotFoundError:
# Just exit early if the file doesn't exist
return
for filenode in directory.files:
if update_mtime:
os.utime(self.objpath(filenode.digest))
if check_exists:
if not os.path.exists(self.objpath(filenode.digest)):
raise FileNotFoundError
reachable.add(filenode.digest.hash)
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists)
# _temporary_object():
#
# Returns:
# (file): A file object to a named temporary file.
#
# Create a named temporary file with 0o0644 access rights.
@contextlib.contextmanager
def _temporary_object(self):
with utils._tempnamedfile(dir=self.tmpdir) as f:
os.chmod(f.name,
stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
yield f
# _ensure_blob():
#
# Fetch and add blob if it's not already local.
#
# Args:
# remote (Remote): The remote to use.
# digest (Digest): Digest object for the blob to fetch.
#
# Returns:
# (str): The path of the object
#
def _ensure_blob(self, remote, digest):
objpath = self.objpath(digest)
if os.path.exists(objpath):
# already in local repository
return objpath
with self._temporary_object() as f:
remote._fetch_blob(digest, f)
added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
return objpath
def _batch_download_complete(self, batch, *, missing_blobs=None):
for digest, data in batch.send(missing_blobs=missing_blobs):
with self._temporary_object() as f:
f.write(data)
f.flush()
added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
# Helper function for _fetch_directory().
def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
self._batch_download_complete(batch)
# All previously scheduled directories are now locally available,
# move them to the processing queue.
fetch_queue.extend(fetch_next_queue)
fetch_next_queue.clear()
return _CASBatchRead(remote)
# Helper function for _fetch_directory().
def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
in_local_cache = os.path.exists(self.objpath(digest))
if in_local_cache:
# Skip download, already in local cache.
pass
elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_read_supported):
# Too large for batch request, download in independent request.
self._ensure_blob(remote, digest)
in_local_cache = True
else:
if not batch.add(digest):
# Not enough space left in batch request.
# Complete pending batch first.
batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
batch.add(digest)
if recursive:
if in_local_cache:
# Add directory to processing queue.
fetch_queue.append(digest)
else:
# Directory will be available after completing pending batch.
# Add directory to deferred processing queue.
fetch_next_queue.append(digest)
return batch
# _fetch_directory():
#
# Fetches remote directory and adds it to content addressable store.
#
# This recursively fetches directory objects but doesn't fetch any
# files.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
#
def _fetch_directory(self, remote, dir_digest):
# TODO Use GetTree() if the server supports it
fetch_queue = [dir_digest]
fetch_next_queue = []
batch = _CASBatchRead(remote)
while len(fetch_queue) + len(fetch_next_queue) > 0:
if not fetch_queue:
batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
dir_digest = fetch_queue.pop(0)
objpath = self._ensure_blob(remote, dir_digest)
directory = remote_execution_pb2.Directory()
with open(objpath, 'rb') as f:
directory.ParseFromString(f.read())
for dirnode in directory.directories:
batch = self._fetch_directory_node(remote, dirnode.digest, batch,
fetch_queue, fetch_next_queue, recursive=True)
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
def _fetch_tree(self, remote, digest):
# download but do not store the Tree object
with utils._tempnamedfile(dir=self.tmpdir) as out:
remote._fetch_blob(digest, out)
tree = remote_execution_pb2.Tree()
with open(out.name, 'rb') as f:
tree.ParseFromString(f.read())
tree.children.extend([tree.root])
for directory in tree.children:
dirbuffer = directory.SerializeToString()
dirdigest = self.add_object(buffer=dirbuffer)
assert dirdigest.size_bytes == len(dirbuffer)
return dirdigest
# fetch_blobs():
#
# Fetch blobs from remote CAS. Returns missing blobs that could not be fetched.
#
# Args:
# remote (CASRemote): The remote repository to fetch from
# digests (list): The Digests of blobs to fetch
#
# Returns: The Digests of the blobs that were not available on the remote CAS
#
def fetch_blobs(self, remote, digests):
missing_blobs = []
batch = _CASBatchRead(remote)
for digest in digests:
if (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_read_supported):
# Too large for batch request, download in independent request.
try:
self._ensure_blob(remote, digest)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
missing_blobs.append(digest)
else:
raise CASCacheError("Failed to fetch blob: {}".format(e)) from e
else:
if not batch.add(digest):
# Not enough space left in batch request.
# Complete pending batch first.
self._batch_download_complete(batch, missing_blobs=missing_blobs)
batch = _CASBatchRead(remote)
batch.add(digest)
# Complete last pending batch
self._batch_download_complete(batch, missing_blobs=missing_blobs)
return missing_blobs
# send_blobs():
#
# Upload blobs to remote CAS.
#
# Args:
# remote (CASRemote): The remote repository to upload to
# digests (list): The Digests of Blobs to upload
#
def send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
batch = _CASBatchUpdate(remote)
for digest in digests:
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
if (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_update_supported):
# Too large for batch request, upload in independent request.
remote._send_blob(digest, f, u_uid=u_uid)
else:
if not batch.add(digest, f):
# Not enough space left in batch request.
# Complete pending batch first.
batch.send()
batch = _CASBatchUpdate(remote)
batch.add(digest, f)
# Send final batch
batch.send()
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
missing_blobs = self.remote_missing_blobs_for_directory(remote, digest)
# Upload any blobs missing on the server
self.send_blobs(remote, missing_blobs, u_uid)
class CASQuota:
def __init__(self, context):
self.context = context
self.cas = context.get_cascache()
self.casdir = self.cas.casdir
self._config_cache_quota = context.config_cache_quota
self._config_cache_quota_string = context.config_cache_quota_string
self._cache_size = None # The current cache size, sometimes it's an estimate
self._cache_quota = None # The cache quota
self._cache_quota_original = None # The cache quota as specified by the user, in bytes
self._cache_quota_headroom = None # The headroom in bytes before reaching the quota or full disk
self._cache_lower_threshold = None # The target cache size for a cleanup
self.available_space = None
self._message = context.message
self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method
self._list_refs_callbacks = [] # Callbacks to all refs
self._calculate_cache_quota()
# compute_cache_size()
#
# Computes the real artifact cache size.
#
# Returns:
# (int): The size of the artifact cache.
#
def compute_cache_size(self):
self._cache_size = utils._get_dir_size(self.casdir)
return self._cache_size
# get_cache_size()
#
# Fetches the cached size of the cache, this is sometimes
# an estimate and periodically adjusted to the real size
# when a cache size calculation job runs.
#
# When it is an estimate, the value is either correct, or
# it is greater than the actual cache size.
#
# Returns:
# (int) An approximation of the artifact cache size, in bytes.
#
def get_cache_size(self):
# If we don't currently have an estimate, figure out the real cache size.
if self._cache_size is None:
stored_size = self._read_cache_size()
if stored_size is not None:
self._cache_size = stored_size
else:
self.compute_cache_size()
return self._cache_size
# set_cache_size()
#
# Forcefully set the overall cache size.
#
# This is used to update the size in the main process after
# having calculated in a cleanup or a cache size calculation job.
#
# Args:
# cache_size (int): The size to set.
# write_to_disk (bool): Whether to write the value to disk.
#
def set_cache_size(self, cache_size, *, write_to_disk=True):
assert cache_size is not None
self._cache_size = cache_size
if write_to_disk:
self._write_cache_size(self._cache_size)
# full()
#
# Checks if the artifact cache is full, either
# because the user configured quota has been exceeded
# or because the underlying disk is almost full.
#
# Returns:
# (bool): True if the artifact cache is full
#
def full(self):
if self.get_cache_size() > self._cache_quota:
return True
_, volume_avail = self._get_cache_volume_size()
if volume_avail < self._cache_quota_headroom:
return True
return False
# add_remove_callbacks()
#
# This adds tuples of iterators over unrequired objects (currently
# artifacts and source refs), and a callback to remove them.
#
# Args:
# callback (iter(unrequired), remove): tuple of iterator and remove
# method associated.
#
def add_remove_callbacks(self, list_unrequired, remove_method):
self._remove_callbacks.append((list_unrequired, remove_method))
def add_list_refs_callback(self, list_callback):
self._list_refs_callbacks.append(list_callback)
################################################
# Local Private Methods #
################################################
# _read_cache_size()
#
# Reads and returns the size of the artifact cache that's stored in the
# cache's size file
#
# Returns:
# (int): The size of the artifact cache, as recorded in the file
#
def _read_cache_size(self):
size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE)
if not os.path.exists(size_file_path):
return None
with open(size_file_path, "r") as f:
size = f.read()
try:
num_size = int(size)
except ValueError as e:
raise CASCacheError("Size '{}' parsed from '{}' was not an integer".format(
size, size_file_path)) from e
return num_size
# _write_cache_size()
#
# Writes the given size of the artifact to the cache's size file
#
# Args:
# size (int): The size of the artifact cache to record
#
def _write_cache_size(self, size):
assert isinstance(size, int)
size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE)
with utils.save_file_atomic(size_file_path, "w", tempdir=self.cas.tmpdir) as f:
f.write(str(size))
# _get_cache_volume_size()
#
# Get the available space and total space for the volume on
# which the artifact cache is located.
#
# Returns:
# (int): The total number of bytes on the volume
# (int): The number of available bytes on the volume
#
# NOTE: We use this stub to allow the test cases
# to override what an artifact cache thinks
# about it's disk size and available bytes.
#
def _get_cache_volume_size(self):
return utils._get_volume_size(self.casdir)
# _calculate_cache_quota()
#
# Calculates and sets the cache quota and lower threshold based on the
# quota set in Context.
# It checks that the quota is both a valid expression, and that there is
# enough disk space to satisfy that quota
#
def _calculate_cache_quota(self):
# Headroom intended to give BuildStream a bit of leeway.
# This acts as the minimum size of cache_quota and also
# is taken from the user requested cache_quota.
#
if 'BST_TEST_SUITE' in os.environ:
self._cache_quota_headroom = 0
else:
self._cache_quota_headroom = 2e9
total_size, available_space = self._get_cache_volume_size()
cache_size = self.get_cache_size()
self.available_space = available_space
# Ensure system has enough storage for the cache_quota
#
# If cache_quota is none, set it to the maximum it could possibly be.
#
# Also check that cache_quota is at least as large as our headroom.
#
cache_quota = self._config_cache_quota
if cache_quota is None:
# The user has set no limit, so we may take all the space.
cache_quota = min(cache_size + available_space, total_size)
if cache_quota < self._cache_quota_headroom: # Check minimum
raise LoadError(
LoadErrorReason.INVALID_DATA,
"Invalid cache quota ({}): BuildStream requires a minimum cache quota of {}.".format(
utils._pretty_size(cache_quota),
utils._pretty_size(self._cache_quota_headroom)))
elif cache_quota > total_size:
# A quota greater than the total disk size is certianly an error
raise CASCacheError("Your system does not have enough available " +
"space to support the cache quota specified.",
detail=("You have specified a quota of {quota} total disk space.\n" +
"The filesystem containing {local_cache_path} only " +
"has {total_size} total disk space.")
.format(
quota=self._config_cache_quota,
local_cache_path=self.casdir,
total_size=utils._pretty_size(total_size)),
reason='insufficient-storage-for-quota')
elif cache_quota > cache_size + available_space:
# The quota does not fit in the available space, this is a warning
if '%' in self._config_cache_quota_string:
available = (available_space / total_size) * 100
available = '{}% of total disk space'.format(round(available, 1))
else:
available = utils._pretty_size(available_space)
self._message(Message(
None,
MessageType.WARN,
"Your system does not have enough available " +
"space to support the cache quota specified.",
detail=("You have specified a quota of {quota} total disk space.\n" +
"The filesystem containing {local_cache_path} only " +
"has {available_size} available.")
.format(quota=self._config_cache_quota,
local_cache_path=self.casdir,
available_size=available)))
# Place a slight headroom (2e9 (2GB) on the cache_quota) into
# cache_quota to try and avoid exceptions.
#
# Of course, we might still end up running out during a build
# if we end up writing more than 2G, but hey, this stuff is
# already really fuzzy.
#
self._cache_quota_original = cache_quota
self._cache_quota = cache_quota - self._cache_quota_headroom
self._cache_lower_threshold = self._cache_quota / 2
# clean():
#
# Clean the artifact cache as much as possible.
#
# Args:
# progress (callable): A callback to call when a ref is removed
#
# Returns:
# (int): The size of the cache after having cleaned up
#
def clean(self, progress=None):
context = self.context
# Some accumulative statistics
removed_ref_count = 0
space_saved = 0
total_refs = 0
for refs in self._list_refs_callbacks:
total_refs += len(list(refs()))
# Start off with an announcement with as much info as possible
volume_size, volume_avail = self._get_cache_volume_size()
self._message(Message(
None, MessageType.STATUS, "Starting cache cleanup",
detail=("Elements required by the current build plan:\n" + "{}\n" +
"User specified quota: {} ({})\n" +
"Cache usage: {}\n" +
"Cache volume: {} total, {} available")
.format(
total_refs,
context.config_cache_quota,
utils._pretty_size(self._cache_quota, dec_places=2),
utils._pretty_size(self.get_cache_size(), dec_places=2),
utils._pretty_size(volume_size, dec_places=2),
utils._pretty_size(volume_avail, dec_places=2))))
# Do a real computation of the cache size once, just in case
self.compute_cache_size()
usage = CASCacheUsage(self)
self._message(Message(None, MessageType.STATUS,
"Cache usage recomputed: {}".format(usage)))
# Collect digests and their remove method
all_unrequired_refs = []
for (unrequired_refs, remove) in self._remove_callbacks:
for (mtime, ref) in unrequired_refs():
all_unrequired_refs.append((mtime, ref, remove))
# Pair refs and their remove method sorted in time order
all_unrequired_refs = [(ref, remove) for (_, ref, remove) in sorted(all_unrequired_refs)]
# Go through unrequired refs and remove them, oldest first
made_space = False
for (ref, remove) in all_unrequired_refs:
size = remove(ref)
removed_ref_count += 1
space_saved += size
self._message(Message(
None, MessageType.STATUS,
"Freed {: <7} {}".format(
utils._pretty_size(size, dec_places=2),
ref)))
self.set_cache_size(self._cache_size - size)
# User callback
#
# Currently this process is fairly slow, but we should
# think about throttling this progress() callback if this
# becomes too intense.
if progress:
progress()
if self.get_cache_size() < self._cache_lower_threshold:
made_space = True
break
if not made_space and self.full():
# If too many artifacts are required, and we therefore
# can't remove them, we have to abort the build.
#
# FIXME: Asking the user what to do may be neater
#
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
'buildstream.conf')
detail = ("Aborted after removing {} refs and saving {} disk space.\n"
"The remaining {} in the cache is required by the {} references in your build plan\n\n"
"There is not enough space to complete the build.\n"
"Please increase the cache-quota in {} and/or make more disk space."
.format(removed_ref_count,
utils._pretty_size(space_saved, dec_places=2),
utils._pretty_size(self.get_cache_size(), dec_places=2),
total_refs,
(context.config_origin or default_conf)))
raise CASCacheError("Cache too full. Aborting.",
detail=detail,
reason="cache-too-full")
# Informational message about the side effects of the cleanup
self._message(Message(
None, MessageType.INFO, "Cleanup completed",
detail=("Removed {} refs and saving {} disk space.\n" +
"Cache usage is now: {}")
.format(removed_ref_count,
utils._pretty_size(space_saved, dec_places=2),
utils._pretty_size(self.get_cache_size(), dec_places=2))))
return self.get_cache_size()
def _grouper(iterable, n):
while True:
try:
current = next(iterable)
except StopIteration:
return
yield itertools.chain([current], itertools.islice(iterable, n - 1))