blob: 83b8e8539e9bb8c3ce9c56b7faca3cc02c2e77b6 [file] [log] [blame]
#
# Copyright (C) 2018 Codethink Limited
# Copyright (C) 2018-2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import itertools
import os
import stat
import errno
import contextlib
import ctypes
import multiprocessing
import shutil
import signal
import subprocess
import tempfile
import time
import grpc
from .._protos.google.rpc import code_pb2
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
from .. import _signals, utils
from ..types import FastEnum
from .._exceptions import CASCacheError
from .._message import Message, MessageType
from .casremote import _CASBatchRead, _CASBatchUpdate
_BUFFER_SIZE = 65536
# Refresh interval for disk usage of local cache in seconds
_CACHE_USAGE_REFRESH = 5
_CASD_MAX_LOGFILES = 10
class CASLogLevel(FastEnum):
WARNING = "warning"
INFO = "info"
TRACE = "trace"
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
# path (str): The root directory for the CAS repository
# casd (bool): True to spawn buildbox-casd (default) or False (testing only)
# cache_quota (int): User configured cache quota
# protect_session_blobs (bool): Disable expiry for blobs used in the current session
# log_level (LogLevel): Log level to give to buildbox-casd for logging
#
class CASCache():
def __init__(
self, path, *, casd=True, cache_quota=None, protect_session_blobs=True, log_level=CASLogLevel.WARNING
):
self.casdir = os.path.join(path, 'cas')
self.tmpdir = os.path.join(path, 'tmp')
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
if casd:
# Place socket in global/user temporary directory to avoid hitting
# the socket path length limit.
self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock')
casd_args = [utils.get_host_tool('buildbox-casd')]
casd_args.append('--bind=unix:' + self._casd_socket_path)
casd_args.append('--log-level=' + log_level.value)
if cache_quota is not None:
casd_args.append('--quota-high={}'.format(int(cache_quota)))
casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
if protect_session_blobs:
casd_args.append('--protect-session-blobs')
casd_args.append(path)
self._casd_start_time = time.time()
self.casd_logfile = self._rotate_and_get_next_logfile()
with open(self.casd_logfile, "w") as logfile_fp:
# Block SIGINT on buildbox-casd, we don't need to stop it
# The frontend will take care of it if needed
with _signals.blocked([signal.SIGINT], ignore=False):
self._casd_process = subprocess.Popen(
casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
else:
self._casd_process = None
self._casd_channel = None
self._casd_cas = None
self._local_cas = None
self._cache_usage_monitor = None
def __getstate__(self):
state = self.__dict__.copy()
# Popen objects are not pickle-able, however, child processes only
# need the information whether a casd subprocess was started or not.
assert '_casd_process' in state
state['_casd_process'] = bool(self._casd_process)
return state
def _init_casd(self):
assert self._casd_process, "CASCache was instantiated without buildbox-casd"
if not self._casd_channel:
self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
# Call GetCapabilities() to establish connection to casd
capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
while True:
try:
capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
break
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
# casd is not ready yet, try again after a 10ms delay,
# but don't wait for more than 15s
if time.time() < self._casd_start_time + 15:
time.sleep(1 / 100)
continue
raise
# _get_cas():
#
# Return ContentAddressableStorage stub for buildbox-casd channel.
#
def _get_cas(self):
if not self._casd_cas:
self._init_casd()
return self._casd_cas
# _get_local_cas():
#
# Return LocalCAS stub for buildbox-casd channel.
#
def _get_local_cas(self):
if not self._local_cas:
self._init_casd()
return self._local_cas
# preflight():
#
# Preflight check.
#
def preflight(self):
headdir = os.path.join(self.casdir, 'refs', 'heads')
objdir = os.path.join(self.casdir, 'objects')
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir))
# has_open_grpc_channels():
#
# Return whether there are gRPC channel instances. This is used to safeguard
# against fork() with open gRPC channels.
#
def has_open_grpc_channels(self):
return bool(self._casd_channel)
# close_grpc_channels():
#
# Close the casd channel if it exists
#
def close_grpc_channels(self):
if self._casd_channel:
self._local_cas = None
self._casd_cas = None
self._casd_channel.close()
self._casd_channel = None
# release_resources():
#
# Release resources used by CASCache.
#
def release_resources(self, messenger=None):
if self._cache_usage_monitor:
self._cache_usage_monitor.release_resources()
if self._casd_process:
self.close_grpc_channels()
self._terminate_casd_process(messenger)
shutil.rmtree(self._casd_socket_tempdir)
# contains():
#
# Check whether the specified ref is already available in the local CAS cache.
#
# Args:
# ref (str): The ref to check
#
# Returns: True if the ref is in the cache, False otherwise
#
def contains(self, ref):
refpath = self._refpath(ref)
# This assumes that the repository doesn't have any dangling pointers
return os.path.exists(refpath)
# contains_file():
#
# Check whether a digest corresponds to a file which exists in CAS
#
# Args:
# digest (Digest): The file digest to check
#
# Returns: True if the file is in the cache, False otherwise
#
def contains_file(self, digest):
return os.path.exists(self.objpath(digest))
# contains_directory():
#
# Check whether the specified directory and subdirectories are in the cache,
# i.e non dangling.
#
# Args:
# digest (Digest): The directory digest to check
# with_files (bool): Whether to check files as well
# update_mtime (bool): Whether to update the timestamp
#
# Returns: True if the directory is available in the local cache
#
def contains_directory(self, digest, *, with_files, update_mtime=False):
try:
directory = remote_execution_pb2.Directory()
path = self.objpath(digest)
with open(path, 'rb') as f:
directory.ParseFromString(f.read())
if update_mtime:
os.utime(f.fileno())
# Optionally check presence of files
if with_files:
for filenode in directory.files:
path = self.objpath(filenode.digest)
if update_mtime:
# No need for separate `exists()` call as this will raise
# FileNotFoundError if the file does not exist.
os.utime(path)
elif not os.path.exists(path):
return False
# Check subdirectories
for dirnode in directory.directories:
if not self.contains_directory(dirnode.digest, with_files=with_files, update_mtime=update_mtime):
return False
return True
except FileNotFoundError:
return False
# checkout():
#
# Checkout the specified directory digest.
#
# Args:
# dest (str): The destination path
# tree (Digest): The directory digest to extract
# can_link (bool): Whether we can create hard links in the destination
#
def checkout(self, dest, tree, *, can_link=False):
os.makedirs(dest, exist_ok=True)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
# regular file, create hardlink
fullpath = os.path.join(dest, filenode.name)
if can_link:
utils.safe_link(self.objpath(filenode.digest), fullpath)
else:
utils.safe_copy(self.objpath(filenode.digest), fullpath)
if filenode.is_executable:
os.chmod(fullpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
for dirnode in directory.directories:
fullpath = os.path.join(dest, dirnode.name)
self.checkout(fullpath, dirnode.digest, can_link=can_link)
for symlinknode in directory.symlinks:
# symlink
fullpath = os.path.join(dest, symlinknode.name)
os.symlink(symlinknode.target, fullpath)
# diff():
#
# Return a list of files that have been added or modified between
# the refs described by ref_a and ref_b.
#
# Args:
# ref_a (str): The first ref
# ref_b (str): The second ref
# subdir (str): A subdirectory to limit the comparison to
#
def diff(self, ref_a, ref_b):
tree_a = self.resolve_ref(ref_a)
tree_b = self.resolve_ref(ref_b)
added = []
removed = []
modified = []
self.diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified)
return modified, removed, added
# pull_tree():
#
# Pull a single Tree rather than a ref.
# Does not update local refs.
#
# Args:
# remote (CASRemote): The remote to pull from
# digest (Digest): The digest of the tree
#
def pull_tree(self, remote, digest):
try:
remote.init()
digest = self._fetch_tree(remote, digest)
return digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise
return None
# objpath():
#
# Return the path of an object based on its digest.
#
# Args:
# digest (Digest): The digest of the object
#
# Returns:
# (str): The path of the object
#
def objpath(self, digest):
return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:])
# add_object():
#
# Hash and write object to CAS.
#
# Args:
# digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
# link_directly (bool): Whether file given by path can be linked
# instance_name (str): casd instance_name for remote CAS
#
# Returns:
# (Digest): The digest of the added object
#
# Either `path` or `buffer` must be passed, but not both.
#
def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False, instance_name=None):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
# If we're linking directly, then path must be specified.
assert (not link_directly) or (link_directly and path)
if digest is None:
digest = remote_execution_pb2.Digest()
with contextlib.ExitStack() as stack:
if path is None:
tmp = stack.enter_context(self._temporary_object())
tmp.write(buffer)
tmp.flush()
path = tmp.name
request = local_cas_pb2.CaptureFilesRequest()
if instance_name:
request.instance_name = instance_name
request.path.append(path)
local_cas = self._get_local_cas()
response = local_cas.CaptureFiles(request)
if len(response.responses) != 1:
raise CASCacheError("Expected 1 response from CaptureFiles, got {}".format(len(response.responses)))
blob_response = response.responses[0]
if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED:
raise CASCacheError("Cache too full", reason="cache-too-full")
if blob_response.status.code != code_pb2.OK:
raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code))
digest.CopyFrom(blob_response.digest)
return digest
# import_directory():
#
# Import directory tree into CAS.
#
# Args:
# path (str): Path to directory to import
#
# Returns:
# (Digest): The digest of the imported directory
#
def import_directory(self, path):
local_cas = self._get_local_cas()
request = local_cas_pb2.CaptureTreeRequest()
request.path.append(path)
response = local_cas.CaptureTree(request)
if len(response.responses) != 1:
raise CASCacheError("Expected 1 response from CaptureTree, got {}".format(len(response.responses)))
tree_response = response.responses[0]
if tree_response.status.code == code_pb2.RESOURCE_EXHAUSTED:
raise CASCacheError("Cache too full", reason="cache-too-full")
if tree_response.status.code != code_pb2.OK:
raise CASCacheError("Failed to capture tree {}: {}".format(path, tree_response.status.code))
treepath = self.objpath(tree_response.tree_digest)
tree = remote_execution_pb2.Tree()
with open(treepath, 'rb') as f:
tree.ParseFromString(f.read())
root_directory = tree.root.SerializeToString()
return utils._message_digest(root_directory)
# set_ref():
#
# Create or replace a ref.
#
# Args:
# ref (str): The name of the ref
#
def set_ref(self, ref, tree):
refpath = self._refpath(ref)
os.makedirs(os.path.dirname(refpath), exist_ok=True)
with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
f.write(tree.SerializeToString())
# resolve_ref():
#
# Resolve a ref to a digest.
#
# Args:
# ref (str): The name of the ref
# update_mtime (bool): Whether to update the mtime of the ref
#
# Returns:
# (Digest): The digest stored in the ref
#
def resolve_ref(self, ref, *, update_mtime=False):
refpath = self._refpath(ref)
try:
with open(refpath, 'rb') as f:
if update_mtime:
os.utime(refpath)
digest = remote_execution_pb2.Digest()
digest.ParseFromString(f.read())
return digest
except FileNotFoundError as e:
raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
# update_mtime()
#
# Update the mtime of a ref.
#
# Args:
# ref (str): The ref to update
#
def update_mtime(self, ref):
try:
os.utime(self._refpath(ref))
except FileNotFoundError as e:
raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
# remove():
#
# Removes the given symbolic ref from the repo.
#
# Args:
# ref (str): A symbolic ref
# basedir (str): Path of base directory the ref is in, defaults to
# CAS refs heads
#
def remove(self, ref, *, basedir=None):
if basedir is None:
basedir = os.path.join(self.casdir, 'refs', 'heads')
# Remove cache ref
self._remove_ref(ref, basedir)
def update_tree_mtime(self, tree):
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
# remote_missing_blobs_for_directory():
#
# Determine which blobs of a directory tree are missing on the remote.
#
# Args:
# digest (Digest): The directory digest
#
# Returns: List of missing Digest objects
#
def remote_missing_blobs_for_directory(self, remote, digest):
required_blobs = self.required_blobs_for_directory(digest)
return self.remote_missing_blobs(remote, required_blobs)
# remote_missing_blobs():
#
# Determine which blobs are missing on the remote.
#
# Args:
# blobs ([Digest]): List of directory digests to check
#
# Returns: List of missing Digest objects
#
def remote_missing_blobs(self, remote, blobs):
cas = self._get_cas()
instance_name = remote.local_cas_instance_name
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(iter(blobs), 512):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=instance_name)
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.CopyFrom(required_digest)
try:
response = cas.FindMissingBlobs(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.INVALID_ARGUMENT and e.details().startswith("Invalid instance name"):
raise CASCacheError("Unsupported buildbox-casd version: FindMissingBlobs failed") from e
raise
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.CopyFrom(missing_digest)
missing_blobs[d.hash] = d
return missing_blobs.values()
# local_missing_blobs():
#
# Check local cache for missing blobs.
#
# Args:
# digests (list): The Digests of blobs to check
#
# Returns: Missing Digest objects
#
def local_missing_blobs(self, digests):
missing_blobs = []
for digest in digests:
objpath = self.objpath(digest)
if not os.path.exists(objpath):
missing_blobs.append(digest)
return missing_blobs
# required_blobs_for_directory():
#
# Generator that returns the Digests of all blobs in the tree specified by
# the Digest of the toplevel Directory object.
#
def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None):
if not excluded_subdirs:
excluded_subdirs = []
# parse directory, and recursively add blobs
yield directory_digest
directory = remote_execution_pb2.Directory()
with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
yield filenode.digest
for dirnode in directory.directories:
if dirnode.name not in excluded_subdirs:
yield from self.required_blobs_for_directory(dirnode.digest)
def diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
dir_a = remote_execution_pb2.Directory()
dir_b = remote_execution_pb2.Directory()
if tree_a:
with open(self.objpath(tree_a), 'rb') as f:
dir_a.ParseFromString(f.read())
if tree_b:
with open(self.objpath(tree_b), 'rb') as f:
dir_b.ParseFromString(f.read())
a = 0
b = 0
while a < len(dir_a.files) or b < len(dir_b.files):
if b < len(dir_b.files) and (a >= len(dir_a.files) or
dir_a.files[a].name > dir_b.files[b].name):
added.append(os.path.join(path, dir_b.files[b].name))
b += 1
elif a < len(dir_a.files) and (b >= len(dir_b.files) or
dir_b.files[b].name > dir_a.files[a].name):
removed.append(os.path.join(path, dir_a.files[a].name))
a += 1
else:
# File exists in both directories
if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash:
modified.append(os.path.join(path, dir_a.files[a].name))
a += 1
b += 1
a = 0
b = 0
while a < len(dir_a.directories) or b < len(dir_b.directories):
if b < len(dir_b.directories) and (a >= len(dir_a.directories) or
dir_a.directories[a].name > dir_b.directories[b].name):
self.diff_trees(None, dir_b.directories[b].digest,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_b.directories[b].name))
b += 1
elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or
dir_b.directories[b].name > dir_a.directories[a].name):
self.diff_trees(dir_a.directories[a].digest, None,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_a.directories[a].name))
a += 1
else:
# Subdirectory exists in both directories
if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash:
self.diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_a.directories[a].name))
a += 1
b += 1
################################################
# Local Private Methods #
################################################
# _rotate_and_get_next_logfile()
#
# Get the logfile to use for casd
#
# This will ensure that we don't create too many casd log files by
# rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
#
# Returns:
# (str): the path to the log file to use
#
def _rotate_and_get_next_logfile(self):
log_dir = os.path.join(self.casdir, "logs")
try:
existing_logs = sorted(os.listdir(log_dir))
except FileNotFoundError:
os.makedirs(log_dir)
else:
while len(existing_logs) >= _CASD_MAX_LOGFILES:
logfile_to_delete = existing_logs.pop(0)
os.remove(os.path.join(log_dir, logfile_to_delete))
return os.path.join(log_dir, str(self._casd_start_time) + ".log")
def _refpath(self, ref):
return os.path.join(self.casdir, 'refs', 'heads', ref)
# _remove_ref()
#
# Removes a ref.
#
# This also takes care of pruning away directories which can
# be removed after having removed the given ref.
#
# Args:
# ref (str): The ref to remove
# basedir (str): Path of base directory the ref is in
#
# Raises:
# (CASCacheError): If the ref didnt exist, or a system error
# occurred while removing it
#
def _remove_ref(self, ref, basedir):
# Remove the ref itself
refpath = os.path.join(basedir, ref)
try:
os.unlink(refpath)
except FileNotFoundError as e:
raise CASCacheError("Could not find ref '{}'".format(ref)) from e
# Now remove any leading directories
components = list(os.path.split(ref))
while components:
components.pop()
refdir = os.path.join(basedir, *components)
# Break out once we reach the base
if refdir == basedir:
break
try:
os.rmdir(refdir)
except FileNotFoundError:
# The parent directory did not exist, but it's
# parent directory might still be ready to prune
pass
except OSError as e:
if e.errno == errno.ENOTEMPTY:
# The parent directory was not empty, so we
# cannot prune directories beyond this point
break
# Something went wrong here
raise CASCacheError("System error while removing ref '{}': {}".format(ref, e)) from e
def _get_subdir(self, tree, subdir):
head, name = os.path.split(subdir)
if head:
tree = self._get_subdir(tree, head)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
for dirnode in directory.directories:
if dirnode.name == name:
return dirnode.digest
raise CASCacheError("Subdirectory {} not found".format(name))
def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False):
if tree.hash in reachable:
return
try:
if update_mtime:
os.utime(self.objpath(tree))
reachable.add(tree.hash)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
except FileNotFoundError:
if check_exists:
raise
# Just exit early if the file doesn't exist
return
for filenode in directory.files:
if update_mtime:
os.utime(self.objpath(filenode.digest))
if check_exists:
if not os.path.exists(self.objpath(filenode.digest)):
raise FileNotFoundError
reachable.add(filenode.digest.hash)
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists)
# _temporary_object():
#
# Returns:
# (file): A file object to a named temporary file.
#
# Create a named temporary file with 0o0644 access rights.
@contextlib.contextmanager
def _temporary_object(self):
with utils._tempnamedfile(dir=self.tmpdir) as f:
os.chmod(f.name,
stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
yield f
# _ensure_blob():
#
# Fetch and add blob if it's not already local.
#
# Args:
# remote (Remote): The remote to use.
# digest (Digest): Digest object for the blob to fetch.
#
# Returns:
# (str): The path of the object
#
def _ensure_blob(self, remote, digest):
objpath = self.objpath(digest)
if os.path.exists(objpath):
# already in local repository
return objpath
batch = _CASBatchRead(remote)
batch.add(digest)
batch.send()
return objpath
# Helper function for _fetch_directory().
def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
batch.send()
# All previously scheduled directories are now locally available,
# move them to the processing queue.
fetch_queue.extend(fetch_next_queue)
fetch_next_queue.clear()
return _CASBatchRead(remote)
# Helper function for _fetch_directory().
def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
in_local_cache = os.path.exists(self.objpath(digest))
if in_local_cache:
# Skip download, already in local cache.
pass
else:
batch.add(digest)
if recursive:
if in_local_cache:
# Add directory to processing queue.
fetch_queue.append(digest)
else:
# Directory will be available after completing pending batch.
# Add directory to deferred processing queue.
fetch_next_queue.append(digest)
return batch
# _fetch_directory():
#
# Fetches remote directory and adds it to content addressable store.
#
# This recursively fetches directory objects but doesn't fetch any
# files.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
#
def _fetch_directory(self, remote, dir_digest):
# TODO Use GetTree() if the server supports it
fetch_queue = [dir_digest]
fetch_next_queue = []
batch = _CASBatchRead(remote)
while len(fetch_queue) + len(fetch_next_queue) > 0:
if not fetch_queue:
batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
dir_digest = fetch_queue.pop(0)
objpath = self._ensure_blob(remote, dir_digest)
directory = remote_execution_pb2.Directory()
with open(objpath, 'rb') as f:
directory.ParseFromString(f.read())
for dirnode in directory.directories:
batch = self._fetch_directory_node(remote, dirnode.digest, batch,
fetch_queue, fetch_next_queue, recursive=True)
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
def _fetch_tree(self, remote, digest):
objpath = self._ensure_blob(remote, digest)
tree = remote_execution_pb2.Tree()
with open(objpath, 'rb') as f:
tree.ParseFromString(f.read())
tree.children.extend([tree.root])
for directory in tree.children:
dirbuffer = directory.SerializeToString()
dirdigest = self.add_object(buffer=dirbuffer)
assert dirdigest.size_bytes == len(dirbuffer)
return dirdigest
# fetch_blobs():
#
# Fetch blobs from remote CAS. Returns missing blobs that could not be fetched.
#
# Args:
# remote (CASRemote): The remote repository to fetch from
# digests (list): The Digests of blobs to fetch
#
# Returns: The Digests of the blobs that were not available on the remote CAS
#
def fetch_blobs(self, remote, digests):
missing_blobs = []
remote.init()
batch = _CASBatchRead(remote)
for digest in digests:
batch.add(digest)
batch.send(missing_blobs=missing_blobs)
return missing_blobs
# send_blobs():
#
# Upload blobs to remote CAS.
#
# Args:
# remote (CASRemote): The remote repository to upload to
# digests (list): The Digests of Blobs to upload
#
def send_blobs(self, remote, digests):
batch = _CASBatchUpdate(remote)
for digest in digests:
batch.add(digest)
batch.send()
def _send_directory(self, remote, digest):
missing_blobs = self.remote_missing_blobs_for_directory(remote, digest)
# Upload any blobs missing on the server
self.send_blobs(remote, missing_blobs)
# _terminate_casd_process()
#
# Terminate the buildbox casd process
#
# Args:
# messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
#
def _terminate_casd_process(self, messenger=None):
return_code = self._casd_process.poll()
if return_code is not None:
# buildbox-casd is already dead
self._casd_process = None
if messenger:
messenger.message(
Message(
MessageType.BUG,
"Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
return_code, self.casd_logfile
),
)
)
return
self._casd_process.terminate()
try:
# Don't print anything if buildbox-casd terminates quickly
return_code = self._casd_process.wait(timeout=0.5)
except subprocess.TimeoutExpired:
if messenger:
cm = messenger.timed_activity("Terminating buildbox-casd")
else:
cm = contextlib.suppress()
with cm:
try:
return_code = self._casd_process.wait(timeout=15)
except subprocess.TimeoutExpired:
self._casd_process.kill()
self._casd_process.wait(timeout=15)
if messenger:
messenger.message(
Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
)
self._casd_process = None
return
if return_code != 0 and messenger:
messenger.message(
Message(
MessageType.BUG,
"Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
return_code, self.casd_logfile
),
)
)
self._casd_process = None
# get_cache_usage():
#
# Fetches the current usage of the CAS local cache.
#
# Returns:
# (CASCacheUsage): The current status
#
def get_cache_usage(self):
if not self._cache_usage_monitor:
self._cache_usage_monitor = _CASCacheUsageMonitor(self)
return self._cache_usage_monitor.get_cache_usage()
# get_casd_process()
#
# Get the underlying buildbox-casd process
#
# Returns:
# (subprocess.Process): The casd process that is used for the current cascache
#
def get_casd_process(self):
assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
return self._casd_process
# _CASCacheUsage
#
# A simple object to report the current CAS cache usage details.
#
# Args:
# used_size (int): Total size used by the local cache, in bytes.
# quota_size (int): Disk quota for the local cache, in bytes.
#
class _CASCacheUsage():
def __init__(self, used_size, quota_size):
self.used_size = used_size
self.quota_size = quota_size
if self.quota_size is None:
self.used_percent = 0
else:
self.used_percent = int(self.used_size * 100 / self.quota_size)
# Formattable into a human readable string
#
def __str__(self):
if self.used_size is None:
return "unknown"
elif self.quota_size is None:
return utils._pretty_size(self.used_size, dec_places=1)
else:
return "{} / {} ({}%)" \
.format(utils._pretty_size(self.used_size, dec_places=1),
utils._pretty_size(self.quota_size, dec_places=1),
self.used_percent)
# _CASCacheUsageMonitor
#
# This manages the subprocess that tracks cache usage information via
# buildbox-casd.
#
class _CASCacheUsageMonitor:
def __init__(self, cas):
self.cas = cas
# Shared memory (64-bit signed integer) for current disk usage and quota
self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1)
self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1)
# multiprocessing.Process will fork without exec on Unix.
# This can't be allowed with background threads or open gRPC channels.
assert utils._is_single_threaded() and not cas.has_open_grpc_channels()
# Block SIGINT, we don't want to kill the process when we interrupt the frontend
# and this process if very lightweight.
with _signals.blocked([signal.SIGINT], ignore=False):
self._subprocess = multiprocessing.Process(target=self._subprocess_run)
self._subprocess.start()
def get_cache_usage(self):
disk_usage = self._disk_usage.value
disk_quota = self._disk_quota.value
if disk_usage < 0:
# Disk usage still unknown
disk_usage = None
if disk_quota <= 0:
# No disk quota
disk_quota = None
return _CASCacheUsage(disk_usage, disk_quota)
def release_resources(self):
# Simply terminate the subprocess, no cleanup required in the subprocess
self._subprocess.terminate()
def _subprocess_run(self):
# Reset SIGTERM in subprocess to default as no cleanup is necessary
signal.signal(signal.SIGTERM, signal.SIG_DFL)
disk_usage = self._disk_usage
disk_quota = self._disk_quota
local_cas = self.cas._get_local_cas()
while True:
try:
# Ask buildbox-casd for current value
request = local_cas_pb2.GetLocalDiskUsageRequest()
response = local_cas.GetLocalDiskUsage(request)
# Update values in shared memory
disk_usage.value = response.size_bytes
disk_quota.value = response.quota_bytes
except grpc.RpcError:
# Terminate loop when buildbox-casd becomes unavailable
break
# Sleep until next refresh
time.sleep(_CACHE_USAGE_REFRESH)
def _grouper(iterable, n):
while True:
try:
current = next(iterable)
except StopIteration:
return
yield itertools.chain([current], itertools.islice(iterable, n - 1))