blob: cea28f7edded73df47cfa6c4bf495e6ceb891f3d [file] [log] [blame]
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import hashlib
import itertools
import os
import stat
import tempfile
import uuid
import errno
import contextlib
from urllib.parse import urlparse
import grpc
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .. import utils
from .._exceptions import CASError
# The default limit for gRPC messages is 4 MiB.
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
_MAX_PAYLOAD_BYTES = 1024 * 1024
# How often is a keepalive ping sent to the server to make sure the transport is still alive
_KEEPALIVE_TIME_MS = 60000
REMOTE_ASSET_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:v1:{}"
class _Attempt():
def __init__(self, last_attempt=False):
self.__passed = None
self.__last_attempt = last_attempt
def passed(self):
return self.__passed
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
try:
if exc_type is None:
self.__passed = True
else:
self.__passed = False
if exc_value is not None:
raise exc_value
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
return not self.__last_attempt
elif e.code() == grpc.StatusCode.ABORTED:
raise CASRemoteError("grpc aborted: {}".format(str(e)),
detail=e.details(),
temporary=True) from e
else:
return False
return False
def _retry(tries=5):
for a in range(tries):
attempt = _Attempt(last_attempt=(a == tries - 1))
yield attempt
if attempt.passed():
break
class BlobNotFound(CASError):
def __init__(self, blob, msg):
self.blob = blob
super().__init__(msg)
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
# path (str): The root directory for the CAS repository
#
class CASCache():
def __init__(self, path):
self.casdir = os.path.join(path, 'cas')
self.tmpdir = os.path.join(path, 'tmp')
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
# preflight():
#
# Preflight check.
#
def preflight(self):
if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or
not os.path.isdir(os.path.join(self.casdir, 'objects'))):
raise CASError("CAS repository check failed for '{}'".format(self.casdir))
# contains():
#
# Check whether the specified ref is already available in the local CAS cache.
#
# Args:
# ref (str): The ref to check
#
# Returns: True if the ref is in the cache, False otherwise
#
def contains(self, ref):
refpath = self._refpath(ref)
# This assumes that the repository doesn't have any dangling pointers
return os.path.exists(refpath)
# extract():
#
# Extract cached directory for the specified ref if it hasn't
# already been extracted.
#
# Args:
# ref (str): The ref whose directory to extract
# path (str): The destination path
#
# Raises:
# CASError: In cases there was an OSError, or if the ref did not exist.
#
# Returns: path to extracted directory
#
def extract(self, ref, path):
tree = self.resolve_ref(ref, update_mtime=True)
dest = os.path.join(path, tree.hash)
if os.path.isdir(dest):
# directory has already been extracted
return dest
with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
checkoutdir = os.path.join(tmpdir, ref)
self._checkout(checkoutdir, tree)
os.makedirs(os.path.dirname(dest), exist_ok=True)
try:
os.rename(checkoutdir, dest)
except OSError as e:
# With rename it's possible to get either ENOTEMPTY or EEXIST
# in the case that the destination path is a not empty directory.
#
# If rename fails with these errors, another process beat
# us to it so just ignore.
if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
return dest
# commit():
#
# Commit directory to cache.
#
# Args:
# refs (list): The refs to set
# path (str): The directory to import
#
def commit(self, refs, path):
tree = self._commit_directory(path)
for ref in refs:
self.set_ref(ref, tree)
# diff():
#
# Return a list of files that have been added or modified between
# the refs described by ref_a and ref_b.
#
# Args:
# ref_a (str): The first ref
# ref_b (str): The second ref
# subdir (str): A subdirectory to limit the comparison to
#
def diff(self, ref_a, ref_b, *, subdir=None):
tree_a = self.resolve_ref(ref_a)
tree_b = self.resolve_ref(ref_b)
if subdir:
tree_a = self._get_subdir(tree_a, subdir)
tree_b = self._get_subdir(tree_b, subdir)
added = []
removed = []
modified = []
self._diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified)
return modified, removed, added
def initialize_remote(self, remote_spec, q):
try:
remote = CASRemote(remote_spec)
remote.init()
if remote.asset_fetch_supported:
if remote_spec.push and not remote.asset_push_supported:
q.put('Remote Asset server does not allow push')
else:
# No error
q.put(None)
else:
request = buildstream_pb2.StatusRequest()
for attempt in _retry():
with attempt:
response = remote.ref_storage.Status(request)
if remote_spec.push and not response.allow_updates:
q.put('CAS server does not allow push')
else:
# No error
q.put(None)
except grpc.RpcError as e:
# str(e) is too verbose for errors reported to the user
q.put(e.details())
except Exception as e: # pylint: disable=broad-except
# Whatever happens, we need to return it to the calling process
#
q.put(str(e))
# pull():
#
# Pull a ref from a remote repository.
#
# Args:
# ref (str): The ref to pull
# remote (CASRemote): The remote repository to pull from
# progress (callable): The progress callback, if any
#
# Returns:
# (bool): True if pull was successful, False if ref was not available
#
def pull(self, ref, remote, *, progress=None):
try:
remote.init()
if remote.asset_fetch_supported:
request = remote_asset_pb2.FetchDirectoryRequest()
request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref))
for attempt in _retry():
with attempt:
response = remote.remote_asset_fetch.FetchDirectory(request)
digest = response.root_directory_digest
else:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
for attempt in _retry():
with attempt:
response = remote.ref_storage.GetReference(request)
digest = response.digest
tree = remote_execution_pb2.Digest()
tree.hash = digest.hash
tree.size_bytes = digest.size_bytes
self._fetch_directory(remote, tree)
self.set_ref(ref, tree)
return True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
else:
return False
# link_ref():
#
# Add an alias for an existing ref.
#
# Args:
# oldref (str): An existing ref
# newref (str): A new ref for the same directory
#
def link_ref(self, oldref, newref):
tree = self.resolve_ref(oldref)
self.set_ref(newref, tree)
# push():
#
# Push committed refs to remote repository.
#
# Args:
# refs (list): The refs to push
# remote (CASRemote): The remote to push to
#
# Returns:
# (bool): True if any remote was updated, False if no pushes were required
#
# Raises:
# (CASError): if there was an error
#
def push(self, refs, remote):
skipped_remote = True
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
# there is no need to push the ref
try:
if remote.asset_fetch_supported:
request = remote_asset_pb2.FetchDirectoryRequest()
request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref))
for attempt in _retry():
with attempt:
response = remote.remote_asset_fetch.FetchDirectory(request)
digest = response.root_directory_digest
else:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
for attempt in _retry():
with attempt:
response = remote.ref_storage.GetReference(request)
digest = response.digest
if digest.hash == tree.hash and digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise
self._send_directory(remote, tree)
if remote.asset_push_supported:
request = remote_asset_pb2.PushDirectoryRequest()
request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref))
request.root_directory_digest.hash = tree.hash
request.root_directory_digest.size_bytes = tree.size_bytes
for attempt in _retry():
with attempt:
remote.remote_asset_push.PushDirectory(request)
else:
request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
for attempt in _retry():
with attempt:
remote.ref_storage.UpdateReference(request)
skipped_remote = False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
return not skipped_remote
# objpath():
#
# Return the path of an object based on its digest.
#
# Args:
# digest (Digest): The digest of the object
#
# Returns:
# (str): The path of the object
#
def objpath(self, digest):
return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:])
# add_object():
#
# Hash and write object to CAS.
#
# Args:
# digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
# link_directly (bool): Whether file given by path can be linked
#
# Returns:
# (Digest): The digest of the added object
#
# Either `path` or `buffer` must be passed, but not both.
#
def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
if digest is None:
digest = remote_execution_pb2.Digest()
try:
h = hashlib.sha256()
# Always write out new file to avoid corruption if input file is modified
with contextlib.ExitStack() as stack:
if path is not None and link_directly:
tmp = stack.enter_context(open(path, 'rb'))
for chunk in iter(lambda: tmp.read(4096), b""):
h.update(chunk)
else:
tmp = stack.enter_context(self._temporary_object())
if path:
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
h.update(chunk)
tmp.write(chunk)
else:
h.update(buffer)
tmp.write(buffer)
tmp.flush()
digest.hash = h.hexdigest()
digest.size_bytes = os.fstat(tmp.fileno()).st_size
# Place file at final location
objpath = self.objpath(digest)
os.makedirs(os.path.dirname(objpath), exist_ok=True)
os.link(tmp.name, objpath)
except FileExistsError as e:
# We can ignore the failed link() if the object is already in the repo.
pass
except OSError as e:
raise CASError("Failed to hash object: {}".format(e)) from e
return digest
# set_ref():
#
# Create or replace a ref.
#
# Args:
# ref (str): The name of the ref
#
def set_ref(self, ref, tree):
refpath = self._refpath(ref)
os.makedirs(os.path.dirname(refpath), exist_ok=True)
with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
f.write(tree.SerializeToString())
# resolve_ref():
#
# Resolve a ref to a digest.
#
# Args:
# ref (str): The name of the ref
# update_mtime (bool): Whether to update the mtime of the ref
#
# Returns:
# (Digest): The digest stored in the ref
#
def resolve_ref(self, ref, *, update_mtime=False):
refpath = self._refpath(ref)
try:
with open(refpath, 'rb') as f:
if update_mtime:
os.utime(refpath)
digest = remote_execution_pb2.Digest()
digest.ParseFromString(f.read())
return digest
except FileNotFoundError as e:
raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
# update_mtime()
#
# Update the mtime of a ref.
#
# Args:
# ref (str): The ref to update
#
def update_mtime(self, ref):
try:
os.utime(self._refpath(ref))
except FileNotFoundError as e:
raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
# calculate_cache_size()
#
# Return the real disk usage of the CAS cache.
#
# Returns:
# (int): The size of the cache.
#
def calculate_cache_size(self):
return utils._get_dir_size(self.casdir)
# list_refs():
#
# List refs in Least Recently Modified (LRM) order.
#
# Returns:
# (list) - A list of refs in LRM order
#
def list_refs(self):
# string of: /path/to/repo/refs/heads
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
refs = []
mtimes = []
for root, _, files in os.walk(ref_heads):
for filename in files:
ref_path = os.path.join(root, filename)
refs.append(os.path.relpath(ref_path, ref_heads))
# Obtain the mtime (the time a file was last modified)
mtimes.append(os.path.getmtime(ref_path))
return sorted(zip(mtimes, refs))
# list_objects():
#
# List cached objects in Least Recently Modified (LRM) order.
#
# Returns:
# (list) - A list of objects and timestamps in LRM order
#
def list_objects(self):
objs = []
mtimes = []
for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
for filename in files:
obj_path = os.path.join(root, filename)
try:
mtimes.append(os.path.getmtime(obj_path))
except FileNotFoundError:
pass
else:
objs.append(obj_path)
# NOTE: Sorted will sort from earliest to latest, thus the
# first element of this list will be the file modified earliest.
return sorted(zip(mtimes, objs))
def clean_up_refs_until(self, time):
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
for root, _, files in os.walk(ref_heads):
for filename in files:
ref_path = os.path.join(root, filename)
# Obtain the mtime (the time a file was last modified)
if os.path.getmtime(ref_path) < time:
os.unlink(ref_path)
# remove():
#
# Removes the given symbolic ref from the repo.
#
# Args:
# ref (str): A symbolic ref
#
def remove(self, ref):
# Remove cache ref
refpath = self._refpath(ref)
if not os.path.exists(refpath):
raise CASError("Could not find ref '{}'".format(ref))
os.unlink(refpath)
def update_tree_mtime(self, tree):
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
################################################
# Local Private Methods #
################################################
def _checkout(self, dest, tree):
os.makedirs(dest, exist_ok=True)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
# regular file, create hardlink
fullpath = os.path.join(dest, filenode.name)
os.link(self.objpath(filenode.digest), fullpath)
if filenode.is_executable:
os.chmod(fullpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
for dirnode in directory.directories:
fullpath = os.path.join(dest, dirnode.name)
self._checkout(fullpath, dirnode.digest)
for symlinknode in directory.symlinks:
# symlink
fullpath = os.path.join(dest, symlinknode.name)
os.symlink(symlinknode.target, fullpath)
def _refpath(self, ref):
return os.path.join(self.casdir, 'refs', 'heads', ref)
# _commit_directory():
#
# Adds local directory to content addressable store.
#
# Adds files, symbolic links and recursively other directories in
# a local directory to the content addressable store.
#
# Args:
# path (str): Path to the directory to add.
# dir_digest (Digest): An optional Digest object to use.
#
# Returns:
# (Digest): Digest object for the directory added.
#
def _commit_directory(self, path, *, dir_digest=None):
directory = remote_execution_pb2.Directory()
for name in sorted(os.listdir(path)):
full_path = os.path.join(path, name)
mode = os.lstat(full_path).st_mode
if stat.S_ISDIR(mode):
dirnode = directory.directories.add()
dirnode.name = name
self._commit_directory(full_path, dir_digest=dirnode.digest)
elif stat.S_ISREG(mode):
filenode = directory.files.add()
filenode.name = name
self.add_object(path=full_path, digest=filenode.digest)
filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
elif stat.S_ISLNK(mode):
symlinknode = directory.symlinks.add()
symlinknode.name = name
symlinknode.target = os.readlink(full_path)
else:
raise CASError("Unsupported file type for {}".format(full_path))
return self.add_object(digest=dir_digest,
buffer=directory.SerializeToString())
def _get_subdir(self, tree, subdir):
head, name = os.path.split(subdir)
if head:
tree = self._get_subdir(tree, head)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
for dirnode in directory.directories:
if dirnode.name == name:
return dirnode.digest
raise CASError("Subdirectory {} not found".format(name))
def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
dir_a = remote_execution_pb2.Directory()
dir_b = remote_execution_pb2.Directory()
if tree_a:
with open(self.objpath(tree_a), 'rb') as f:
dir_a.ParseFromString(f.read())
if tree_b:
with open(self.objpath(tree_b), 'rb') as f:
dir_b.ParseFromString(f.read())
a = 0
b = 0
while a < len(dir_a.files) or b < len(dir_b.files):
if b < len(dir_b.files) and (a >= len(dir_a.files) or
dir_a.files[a].name > dir_b.files[b].name):
added.append(os.path.join(path, dir_b.files[b].name))
b += 1
elif a < len(dir_a.files) and (b >= len(dir_b.files) or
dir_b.files[b].name > dir_a.files[a].name):
removed.append(os.path.join(path, dir_a.files[a].name))
a += 1
else:
# File exists in both directories
if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash:
modified.append(os.path.join(path, dir_a.files[a].name))
a += 1
b += 1
a = 0
b = 0
while a < len(dir_a.directories) or b < len(dir_b.directories):
if b < len(dir_b.directories) and (a >= len(dir_a.directories) or
dir_a.directories[a].name > dir_b.directories[b].name):
self._diff_trees(None, dir_b.directories[b].digest,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_b.directories[b].name))
b += 1
elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or
dir_b.directories[b].name > dir_a.directories[a].name):
self._diff_trees(dir_a.directories[a].digest, None,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_a.directories[a].name))
a += 1
else:
# Subdirectory exists in both directories
if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash:
self._diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest,
added=added, removed=removed, modified=modified,
path=os.path.join(path, dir_a.directories[a].name))
a += 1
b += 1
def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
if tree.hash in reachable:
return
if update_mtime:
os.utime(self.objpath(tree))
reachable.add(tree.hash)
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
if update_mtime:
os.utime(self.objpath(filenode.digest))
reachable.add(filenode.digest.hash)
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
d.hash = directory_digest.hash
d.size_bytes = directory_digest.size_bytes
yield d
directory = remote_execution_pb2.Directory()
with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
d = remote_execution_pb2.Digest()
d.hash = filenode.digest.hash
d.size_bytes = filenode.digest.size_bytes
yield d
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)
def _fetch_blob(self, remote, digest, stream):
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
for response in remote.bytestream.Read(request):
stream.write(response.data)
stream.flush()
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
# _temporary_object():
#
# Returns:
# (file): A file object to a named temporary file.
#
# Create a named temporary file with 0o0644 access rights.
@contextlib.contextmanager
def _temporary_object(self):
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
os.chmod(f.name,
stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
yield f
# _ensure_blob():
#
# Fetch and add blob if it's not already local.
#
# Args:
# remote (Remote): The remote to use.
# digest (Digest): Digest object for the blob to fetch.
#
# Returns:
# (str): The path of the object
#
def _ensure_blob(self, remote, digest):
objpath = self.objpath(digest)
if os.path.exists(objpath):
# already in local repository
return objpath
with self._temporary_object() as f:
self._fetch_blob(remote, digest, f)
added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
return objpath
def _batch_download_complete(self, batch):
for digest, data in batch.send():
with self._temporary_object() as f:
f.write(data)
f.flush()
added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
# Helper function for _fetch_directory().
def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
self._batch_download_complete(batch)
# All previously scheduled directories are now locally available,
# move them to the processing queue.
fetch_queue.extend(fetch_next_queue)
fetch_next_queue.clear()
return _CASBatchRead(remote)
# Helper function for _fetch_directory().
def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
in_local_cache = os.path.exists(self.objpath(digest))
if in_local_cache:
# Skip download, already in local cache.
pass
elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_read_supported):
# Too large for batch request, download in independent request.
self._ensure_blob(remote, digest)
in_local_cache = True
else:
if not batch.add(digest):
# Not enough space left in batch request.
# Complete pending batch first.
batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
batch.add(digest)
if recursive:
if in_local_cache:
# Add directory to processing queue.
fetch_queue.append(digest)
else:
# Directory will be available after completing pending batch.
# Add directory to deferred processing queue.
fetch_next_queue.append(digest)
return batch
# _fetch_directory():
#
# Fetches remote directory and adds it to content addressable store.
#
# Fetches files, symbolic links and recursively other directories in
# the remote directory and adds them to the content addressable
# store.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
#
def _fetch_directory(self, remote, dir_digest):
fetch_queue = [dir_digest]
fetch_next_queue = []
batch = _CASBatchRead(remote)
while len(fetch_queue) + len(fetch_next_queue) > 0:
if len(fetch_queue) == 0:
batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
dir_digest = fetch_queue.pop(0)
objpath = self._ensure_blob(remote, dir_digest)
directory = remote_execution_pb2.Directory()
with open(objpath, 'rb') as f:
directory.ParseFromString(f.read())
for dirnode in directory.directories:
batch = self._fetch_directory_node(remote, dirnode.digest, batch,
fetch_queue, fetch_next_queue, recursive=True)
for filenode in directory.files:
batch = self._fetch_directory_node(remote, filenode.digest, batch,
fetch_queue, fetch_next_queue)
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
digest.hash, str(digest.size_bytes)])
def request_stream(resname, instream):
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
remaining -= chunk_size
request = bytestream_pb2.WriteRequest()
request.write_offset = offset
# max. _MAX_PAYLOAD_BYTES chunks
request.data = instream.read(chunk_size)
request.resource_name = resname
request.finish_write = remaining <= 0
yield request
offset += chunk_size
finished = request.finish_write
for attempt in _retry():
with attempt:
response = remote.bytestream.Write(request_stream(resource_name, stream))
assert response.committed_size == digest.size_bytes
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
required_blobs = self._required_blobs(digest)
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest()
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
for attempt in _retry():
with attempt:
response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
d.size_bytes = missing_digest.size_bytes
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
self._send_blobs(remote, missing_blobs.values(), u_uid)
def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
batch = _CASBatchUpdate(remote)
for digest in digests:
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
if (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_update_supported):
# Too large for batch request, upload in independent request.
self._send_blob(remote, digest, f, u_uid=u_uid)
else:
if not batch.add(digest, f):
# Not enough space left in batch request.
# Complete pending batch first.
batch.send()
batch = _CASBatchUpdate(remote)
batch.add(digest, f)
# Send final batch
batch.send()
# Represents a single remote CAS cache.
#
class CASRemote():
def __init__(self, spec):
self.spec = spec
self._initialized = False
self.channel = None
self.bytestream = None
self.cas = None
self.ref_storage = None
def init(self):
if not self._initialized:
url = urlparse(self.spec.url)
if url.scheme == 'http':
port = url.port or 80
self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port),
options=[("grpc.keepalive_time_ms", _KEEPALIVE_TIME_MS)])
elif url.scheme == 'https':
port = url.port or 443
if self.spec.server_cert:
with open(self.spec.server_cert, 'rb') as f:
server_cert_bytes = f.read()
else:
server_cert_bytes = None
if self.spec.client_key:
with open(self.spec.client_key, 'rb') as f:
client_key_bytes = f.read()
else:
client_key_bytes = None
if self.spec.client_cert:
with open(self.spec.client_cert, 'rb') as f:
client_cert_bytes = f.read()
else:
client_cert_bytes = None
credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
private_key=client_key_bytes,
certificate_chain=client_cert_bytes)
self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials,
options=[("grpc.keepalive_time_ms", _KEEPALIVE_TIME_MS)])
else:
raise CASError("Unsupported URL: {}".format(self.spec.url))
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
self.remote_asset_fetch = remote_asset_pb2_grpc.FetchStub(self.channel)
self.remote_asset_push = remote_asset_pb2_grpc.PushStub(self.channel)
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
try:
request = remote_execution_pb2.GetCapabilitiesRequest()
for attempt in _retry():
with attempt:
response = self.capabilities.GetCapabilities(request)
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
except grpc.RpcError as e:
# Simply use the defaults for servers that don't implement GetCapabilities()
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise
# Check whether the server supports BatchReadBlobs()
self.batch_read_supported = False
try:
request = remote_execution_pb2.BatchReadBlobsRequest()
for attempt in _retry():
with attempt:
response = self.cas.BatchReadBlobs(request)
self.batch_read_supported = True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise
# Check whether the server supports BatchUpdateBlobs()
self.batch_update_supported = False
try:
request = remote_execution_pb2.BatchUpdateBlobsRequest()
for attempt in _retry():
with attempt:
response = self.cas.BatchUpdateBlobs(request)
self.batch_update_supported = True
except grpc.RpcError as e:
if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
e.code() != grpc.StatusCode.PERMISSION_DENIED):
raise
self.asset_fetch_supported = False
try:
request = remote_asset_pb2.FetchDirectoryRequest()
for attempt in _retry():
with attempt:
response = self.remote_asset_fetch.FetchDirectory(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
# Expected error as the request doesn't specify any URIs.
self.asset_fetch_supported = True
elif e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise
self.asset_push_supported = False
try:
request = remote_asset_pb2.PushDirectoryRequest()
for attempt in _retry():
with attempt:
response = self.remote_asset_push.PushDirectory(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
# Expected error as the request doesn't specify any URIs.
self.asset_push_supported = True
elif e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise
self._initialized = True
# Represents a batch of blobs queued for fetching.
#
class _CASBatchRead():
def __init__(self, remote):
self._remote = remote
self._max_total_size_bytes = remote.max_batch_total_size_bytes
self._request = remote_execution_pb2.BatchReadBlobsRequest()
self._size = 0
self._sent = False
def add(self, digest):
assert not self._sent
new_batch_size = self._size + digest.size_bytes
if new_batch_size > self._max_total_size_bytes:
# Not enough space left in current batch
return False
request_digest = self._request.digests.add()
request_digest.hash = digest.hash
request_digest.size_bytes = digest.size_bytes
self._size = new_batch_size
return True
def send(self):
assert not self._sent
self._sent = True
if len(self._request.digests) == 0:
return
for attempt in _retry():
with attempt:
batch_response = self._remote.cas.BatchReadBlobs(self._request)
for response in batch_response.responses:
if response.status.code == grpc.StatusCode.NOT_FOUND.value[0]:
raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
if response.status.code != grpc.StatusCode.OK.value[0]:
raise CASError("Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
if response.digest.size_bytes != len(response.data):
raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
response.digest.hash, response.digest.size_bytes, len(response.data)))
yield (response.digest, response.data)
# Represents a batch of blobs queued for upload.
#
class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
self._max_total_size_bytes = remote.max_batch_total_size_bytes
self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
self._size = 0
self._sent = False
def add(self, digest, stream):
assert not self._sent
new_batch_size = self._size + digest.size_bytes
if new_batch_size > self._max_total_size_bytes:
# Not enough space left in current batch
return False
blob_request = self._request.requests.add()
blob_request.digest.hash = digest.hash
blob_request.digest.size_bytes = digest.size_bytes
blob_request.data = stream.read(digest.size_bytes)
self._size = new_batch_size
return True
def send(self):
assert not self._sent
self._sent = True
if len(self._request.requests) == 0:
return
for attempt in _retry():
with attempt:
batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
for response in batch_response.responses:
if response.status.code != grpc.StatusCode.OK.value[0]:
raise CASError("Failed to upload blob {}: {}".format(
response.digest.hash, response.status.code))
def _grouper(iterable, n):
while True:
try:
current = next(iterable)
except StopIteration:
return
yield itertools.chain([current], itertools.islice(iterable, n - 1))