blob: 2aff29b9814f7b437928da484ccf845f6c5e3a8f [file] [log] [blame]
#
# Copyright (C) 2018 Bloomberg LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jim MacArthur <jim.macarthur@codethink.co.uk>
"""
CasBasedDirectory
=========
Implementation of the Directory class which backs onto a Merkle-tree based content
addressable storage system.
See also: :ref:`sandboxing`.
"""
import os
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .directory import Directory, VirtualDirectoryError, _FileType
from ._filebaseddirectory import FileBasedDirectory
from ..utils import FileListResult, _magic_timestamp
class IndexEntry():
""" Directory entry used in CasBasedDirectory.index """
def __init__(self, name, entrytype, *, digest=None, target=None, is_executable=False,
buildstream_object=None, modified=False):
self.name = name
self.type = entrytype
self.digest = digest
self.target = target
self.is_executable = is_executable
self.buildstream_object = buildstream_object
self.modified = modified
def get_directory(self, parent):
if not self.buildstream_object:
self.buildstream_object = CasBasedDirectory(parent.cas_cache, digest=self.digest,
parent=parent, filename=self.name)
self.digest = None
return self.buildstream_object
def get_digest(self):
if self.digest:
return self.digest
else:
return self.buildstream_object._get_digest()
class ResolutionException(VirtualDirectoryError):
""" Superclass of all exceptions that can be raised by
CasBasedDirectory._resolve. Should not be used outside this module. """
class InfiniteSymlinkException(ResolutionException):
""" Raised when an infinite symlink loop is found. """
class AbsoluteSymlinkException(ResolutionException):
"""Raised if we try to follow an absolute symlink (i.e. one whose
target starts with the path separator) and we have disallowed
following such symlinks.
"""
class UnexpectedFileException(ResolutionException):
"""Raised if we were found a file where a directory or symlink was
expected, for example we try to resolve a symlink pointing to
/a/b/c but /a/b is a file.
"""
def __init__(self, message=""):
"""Allow constructor with no arguments, since this can be raised in
places where there isn't sufficient information to write the
message.
"""
super().__init__(message)
# CasBasedDirectory intentionally doesn't call its superclass constuctor,
# which is meant to be unimplemented.
# pylint: disable=super-init-not-called
class CasBasedDirectory(Directory):
"""
CAS-based directories can have two names; one is a 'common name' which has no effect
on functionality, and the 'filename'. If a CasBasedDirectory has a parent, then 'filename'
must be the name of an entry in the parent directory's index which points to this object.
This is used to inform a parent directory that it must update the given hash for this
object when this object changes.
Typically a top-level CasBasedDirectory will have a common_name and no filename, and
subdirectories wil have a filename and no common_name. common_name can used to identify
CasBasedDirectory objects in a log file, since they have no unique position in a file
system.
"""
# Two constants which define the separators used by the remote execution API.
_pb2_path_sep = "/"
_pb2_absolute_path_prefix = "/"
def __init__(self, cas_cache, *, digest=None, parent=None, common_name="untitled", filename=None):
self.filename = filename
self.common_name = common_name
self.cas_cache = cas_cache
self.__digest = digest
self.index = {}
self.parent = parent
if digest:
self._populate_index(digest)
def _populate_index(self, digest):
try:
pb2_directory = remote_execution_pb2.Directory()
with open(self.cas_cache.objpath(digest), 'rb') as f:
pb2_directory.ParseFromString(f.read())
except FileNotFoundError as e:
raise VirtualDirectoryError("Directory not found in local cache: {}".format(e)) from e
for entry in pb2_directory.directories:
self.index[entry.name] = IndexEntry(entry.name, _FileType.DIRECTORY,
digest=entry.digest)
for entry in pb2_directory.files:
self.index[entry.name] = IndexEntry(entry.name, _FileType.REGULAR_FILE,
digest=entry.digest,
is_executable=entry.is_executable)
for entry in pb2_directory.symlinks:
self.index[entry.name] = IndexEntry(entry.name, _FileType.SYMLINK,
target=entry.target)
def _find_self_in_parent(self):
assert self.parent is not None
parent = self.parent
for (k, v) in parent.index.items():
if v.buildstream_object == self:
return k
return None
def _add_directory(self, name):
assert name not in self.index
newdir = CasBasedDirectory(self.cas_cache, parent=self, filename=name)
self.index[name] = IndexEntry(name, _FileType.DIRECTORY, buildstream_object=newdir)
self.__invalidate_digest()
return newdir
def _add_file(self, basename, filename, modified=False):
entry = IndexEntry(filename, _FileType.REGULAR_FILE,
modified=modified or filename in self.index)
path = os.path.join(basename, filename)
entry.digest = self.cas_cache.add_object(path=path)
entry.is_executable = os.access(path, os.X_OK)
self.index[filename] = entry
self.__invalidate_digest()
def _copy_link_from_filesystem(self, basename, filename):
self._add_new_link_direct(filename, os.readlink(os.path.join(basename, filename)))
def _add_new_link_direct(self, name, target):
self.index[name] = IndexEntry(name, _FileType.SYMLINK, target=target, modified=name in self.index)
self.__invalidate_digest()
def delete_entry(self, name):
if name in self.index:
del self.index[name]
self.__invalidate_digest()
def descend(self, *paths, create=False):
"""Descend one or more levels of directory hierarchy and return a new
Directory object for that directory.
Arguments:
* *paths (str): A list of strings which are all directory names.
* create (boolean): If this is true, the directories will be created if
they don't already exist.
Note: At the moment, creating a directory by descending does
not update this object in the CAS cache. However, performing
an import_files() into a subdirectory of any depth obtained by
descending from this object *will* cause this directory to be
updated and stored.
"""
current_dir = self
for path in paths:
# Skip empty path segments
if not path:
continue
entry = current_dir.index.get(path)
if entry:
if entry.type == _FileType.DIRECTORY:
current_dir = entry.get_directory(current_dir)
else:
error = "Cannot descend into {}, which is a '{}' in the directory {}"
raise VirtualDirectoryError(error.format(path,
current_dir.index[path].type,
current_dir))
else:
if create:
current_dir = current_dir._add_directory(path)
else:
error = "'{}' not found in {}"
raise VirtualDirectoryError(error.format(path, str(current_dir)))
return current_dir
def _check_replacement(self, name, relative_pathname, fileListResult):
""" Checks whether 'name' exists, and if so, whether we can overwrite it.
If we can, add the name to 'overwritten_files' and delete the existing entry.
Returns 'True' if the import should go ahead.
fileListResult.overwritten and fileListResult.ignore are updated depending
on the result. """
existing_entry = self.index.get(name)
if existing_entry is None:
return True
elif existing_entry.type == _FileType.DIRECTORY:
# If 'name' maps to a DirectoryNode, then there must be an entry in index
# pointing to another Directory.
subdir = existing_entry.get_directory(self)
if subdir.is_empty():
self.delete_entry(name)
fileListResult.overwritten.append(relative_pathname)
return True
else:
# We can't overwrite a non-empty directory, so we just ignore it.
fileListResult.ignored.append(relative_pathname)
return False
else:
self.delete_entry(name)
fileListResult.overwritten.append(relative_pathname)
return True
def _import_files_from_directory(self, source_directory, filter_callback, *, path_prefix="", result):
""" Import files from a traditional directory. """
for direntry in os.scandir(source_directory):
# The destination filename, relative to the root where the import started
relative_pathname = os.path.join(path_prefix, direntry.name)
is_dir = direntry.is_dir(follow_symlinks=False)
if is_dir:
src_subdir = os.path.join(source_directory, direntry.name)
try:
create_subdir = direntry.name not in self.index
dest_subdir = self.descend(direntry.name, create=create_subdir)
except VirtualDirectoryError:
filetype = self.index[direntry.name].type
raise VirtualDirectoryError('Destination is a {}, not a directory: /{}'
.format(filetype, relative_pathname))
dest_subdir._import_files_from_directory(src_subdir, filter_callback,
path_prefix=relative_pathname, result=result)
if filter_callback and not filter_callback(relative_pathname):
if is_dir and create_subdir and dest_subdir.is_empty():
# Complete subdirectory has been filtered out, remove it
self.delete_entry(direntry.name)
# Entry filtered out, move to next
continue
if direntry.is_file(follow_symlinks=False):
if self._check_replacement(direntry.name, relative_pathname, result):
self._add_file(source_directory, direntry.name, modified=relative_pathname in result.overwritten)
result.files_written.append(relative_pathname)
elif direntry.is_symlink():
if self._check_replacement(direntry.name, relative_pathname, result):
self._copy_link_from_filesystem(source_directory, direntry.name)
result.files_written.append(relative_pathname)
def _partial_import_cas_into_cas(self, source_directory, filter_callback, *, path_prefix="", result):
""" Import files from a CAS-based directory. """
for name, entry in source_directory.index.items():
# The destination filename, relative to the root where the import started
relative_pathname = os.path.join(path_prefix, name)
is_dir = entry.type == _FileType.DIRECTORY
if is_dir:
create_subdir = name not in self.index
if create_subdir and not filter_callback:
# If subdirectory does not exist yet and there is no filter,
# we can import the whole source directory by digest instead
# of importing each directory entry individually.
subdir_digest = entry.get_digest()
dest_entry = IndexEntry(name, _FileType.DIRECTORY, digest=subdir_digest)
self.index[name] = dest_entry
self.__invalidate_digest()
# However, we still need to iterate over the directory entries
# to fill in `result.files_written`.
# Use source subdirectory object if it already exists,
# otherwise create object for destination subdirectory.
# This is based on the assumption that the destination
# subdirectory is more likely to be modified later on
# (e.g., by further import_files() calls).
if entry.buildstream_object:
subdir = entry.buildstream_object
else:
subdir = dest_entry.get_directory(self)
subdir.__add_files_to_result(path_prefix=relative_pathname, result=result)
else:
src_subdir = source_directory.descend(name)
try:
dest_subdir = self.descend(name, create=create_subdir)
except VirtualDirectoryError:
filetype = self.index[name].type
raise VirtualDirectoryError('Destination is a {}, not a directory: /{}'
.format(filetype, relative_pathname))
dest_subdir._partial_import_cas_into_cas(src_subdir, filter_callback,
path_prefix=relative_pathname, result=result)
if filter_callback and not filter_callback(relative_pathname):
if is_dir and create_subdir and dest_subdir.is_empty():
# Complete subdirectory has been filtered out, remove it
self.delete_entry(name)
# Entry filtered out, move to next
continue
if not is_dir:
if self._check_replacement(name, relative_pathname, result):
if entry.type == _FileType.REGULAR_FILE:
self.index[name] = IndexEntry(name, _FileType.REGULAR_FILE,
digest=entry.digest,
is_executable=entry.is_executable,
modified=True)
self.__invalidate_digest()
else:
assert entry.type == _FileType.SYMLINK
self._add_new_link_direct(name=name, target=entry.target)
result.files_written.append(relative_pathname)
def import_files(self, external_pathspec, *,
filter_callback=None,
report_written=True, update_mtime=False,
can_link=False):
""" See superclass Directory for arguments """
result = FileListResult()
if isinstance(external_pathspec, FileBasedDirectory):
source_directory = external_pathspec._get_underlying_directory()
self._import_files_from_directory(source_directory, filter_callback, result=result)
elif isinstance(external_pathspec, str):
source_directory = external_pathspec
self._import_files_from_directory(source_directory, filter_callback, result=result)
else:
assert isinstance(external_pathspec, CasBasedDirectory)
self._partial_import_cas_into_cas(external_pathspec, filter_callback, result=result)
# TODO: No notice is taken of report_written, update_mtime or can_link.
# Current behaviour is to fully populate the report, which is inefficient,
# but still correct.
return result
def set_deterministic_mtime(self):
""" Sets a static modification time for all regular files in this directory.
Since we don't store any modification time, we don't need to do anything.
"""
def set_deterministic_user(self):
""" Sets all files in this directory to the current user's euid/egid.
We also don't store user data, so this can be ignored.
"""
def export_files(self, to_directory, *, can_link=False, can_destroy=False):
"""Copies everything from this into to_directory, which must be the name
of a traditional filesystem directory.
Arguments:
to_directory (string): a path outside this directory object
where the contents will be copied to.
can_link (bool): Whether we can create hard links in to_directory
instead of copying.
can_destroy (bool): Whether we can destroy elements in this
directory to export them (e.g. by renaming them as the
target).
"""
self.cas_cache.checkout(to_directory, self._get_digest(), can_link=can_link)
def export_to_tar(self, tarfile, destination_dir, mtime=_magic_timestamp):
raise NotImplementedError()
def mark_changed(self):
""" It should not be possible to externally modify a CAS-based
directory at the moment."""
raise NotImplementedError()
def is_empty(self):
""" Return true if this directory has no files, subdirectories or links in it.
"""
return len(self.index) == 0
def _mark_directory_unmodified(self):
# Marks all entries in this directory and all child directories as unmodified.
for i in self.index.values():
i.modified = False
if i.type == _FileType.DIRECTORY and i.buildstream_object:
i.buildstream_object._mark_directory_unmodified()
def _mark_entry_unmodified(self, name):
# Marks an entry as unmodified. If the entry is a directory, it will
# recursively mark all its tree as unmodified.
self.index[name].modified = False
if self.index[name].buildstream_object:
self.index[name].buildstream_object._mark_directory_unmodified()
def mark_unmodified(self):
""" Marks all files in this directory (recursively) as unmodified.
If we have a parent, we mark our own entry as unmodified in that parent's
index.
"""
if self.parent:
self.parent._mark_entry_unmodified(self._find_self_in_parent())
else:
self._mark_directory_unmodified()
def _lightweight_resolve_to_index(self, path):
"""A lightweight function for transforming paths into IndexEntry
objects. This does not follow symlinks.
path: The string to resolve. This should be a series of path
components separated by the protocol buffer path separator
_pb2_path_sep.
Returns: the IndexEntry found, or None if any of the path components were not present.
"""
directory = self
path_components = path.split(CasBasedDirectory._pb2_path_sep)
for component in path_components[:-1]:
if component not in directory.index:
return None
if directory.index[component].type == _FileType.DIRECTORY:
directory = directory.index[component].get_directory(self)
else:
return None
return directory.index.get(path_components[-1], None)
def list_modified_paths(self):
"""Provide a list of relative paths which have been modified since the
last call to mark_unmodified.
Return value: List(str) - list of modified paths
"""
for p in self.list_relative_paths():
i = self._lightweight_resolve_to_index(p)
if i and i.modified:
yield p
def list_relative_paths(self, relpath=""):
"""Provide a list of all relative paths.
Return value: List(str) - list of all paths
"""
file_list = list(filter(lambda i: i[1].type != _FileType.DIRECTORY,
self.index.items()))
directory_list = filter(lambda i: i[1].type == _FileType.DIRECTORY,
self.index.items())
if relpath != "":
yield relpath
for (k, v) in sorted(file_list):
yield os.path.join(relpath, k)
for (k, v) in sorted(directory_list):
subdir = v.get_directory(self)
yield from subdir.list_relative_paths(relpath=os.path.join(relpath, k))
def get_size(self):
digest = self._get_digest()
total = digest.size_bytes
for i in self.index.values():
if i.type == _FileType.DIRECTORY:
subdir = i.get_directory(self)
total += subdir.get_size()
elif i.type == _FileType.REGULAR_FILE:
total += i.digest.size_bytes
# Symlink nodes are encoded as part of the directory serialization.
return total
def _get_identifier(self):
path = ""
if self.parent:
path = self.parent._get_identifier()
if self.filename:
path += "/" + self.filename
else:
path += "/" + self.common_name
return path
def __str__(self):
return "[CAS:{}]".format(self._get_identifier())
def _get_underlying_directory(self):
""" There is no underlying directory for a CAS-backed directory, so
throw an exception. """
raise VirtualDirectoryError("_get_underlying_directory was called on a CAS-backed directory," +
" which has no underlying directory.")
# _get_digest():
#
# Return the Digest for this directory.
#
# Returns:
# (Digest): The Digest protobuf object for the Directory protobuf
#
def _get_digest(self):
if not self.__digest:
# Create updated Directory proto
pb2_directory = remote_execution_pb2.Directory()
for name, entry in sorted(self.index.items()):
if entry.type == _FileType.DIRECTORY:
dirnode = pb2_directory.directories.add()
dirnode.name = name
# Update digests for subdirectories in DirectoryNodes.
# No need to call entry.get_directory().
# If it hasn't been instantiated, digest must be up-to-date.
subdir = entry.buildstream_object
if subdir:
dirnode.digest.CopyFrom(subdir._get_digest())
else:
dirnode.digest.CopyFrom(entry.digest)
elif entry.type == _FileType.REGULAR_FILE:
filenode = pb2_directory.files.add()
filenode.name = name
filenode.digest.CopyFrom(entry.digest)
filenode.is_executable = entry.is_executable
elif entry.type == _FileType.SYMLINK:
symlinknode = pb2_directory.symlinks.add()
symlinknode.name = name
symlinknode.target = entry.target
self.__digest = self.cas_cache.add_object(buffer=pb2_directory.SerializeToString())
return self.__digest
def _get_child_digest(self, *path):
subdir = self.descend(*path[:-1])
entry = subdir.index[path[-1]]
if entry.type == _FileType.DIRECTORY:
subdir = entry.buildstream_object
if subdir:
return subdir._get_digest()
else:
return entry.digest
elif entry.type == _FileType.REGULAR_FILE:
return entry.digest
else:
raise VirtualDirectoryError("Directory entry has no digest: {}".format(os.path.join(*path)))
def _objpath(self, *path):
subdir = self.descend(*path[:-1])
entry = subdir.index[path[-1]]
return self.cas_cache.objpath(entry.digest)
def _exists(self, *path):
try:
subdir = self.descend(*path[:-1])
return path[-1] in subdir.index
except VirtualDirectoryError:
return False
def __invalidate_digest(self):
if self.__digest:
self.__digest = None
if self.parent:
self.parent.__invalidate_digest()
def __add_files_to_result(self, *, path_prefix="", result):
for name, entry in self.index.items():
# The destination filename, relative to the root where the import started
relative_pathname = os.path.join(path_prefix, name)
if entry.type == _FileType.DIRECTORY:
subdir = self.descend(name)
subdir.__add_files_to_result(path_prefix=relative_pathname, result=result)
else:
result.files_written.append(relative_pathname)