blob: c061a28e45e4d375f9fb5d8e4d00bc2b6033d2e5 [file] [log] [blame]
#
# Copyright (C) 2018 Bloomberg LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jim MacArthur <jim.macarthur@codethink.co.uk>
"""
CasBasedDirectory
=========
Implementation of the Directory class which backs onto a Merkle-tree based content
addressable storage system.
See also: :ref:`sandboxing`.
"""
import os
import stat
import tarfile as tarfilelib
from contextlib import contextmanager
from io import StringIO
from google.protobuf import timestamp_pb2
from .. import utils
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .directory import Directory, VirtualDirectoryError, _FileType
from ._filebaseddirectory import FileBasedDirectory
from ..utils import FileListResult, BST_ARBITRARY_TIMESTAMP
class IndexEntry:
""" Directory entry used in CasBasedDirectory.index """
def __init__(
self,
name,
entrytype,
*,
digest=None,
target=None,
is_executable=False,
buildstream_object=None,
modified=False,
mtime=None
):
self.name = name
self.type = entrytype
self.digest = digest
self.target = target
self.is_executable = is_executable
self.buildstream_object = buildstream_object
self.modified = modified
self.mtime = mtime
def get_directory(self, parent):
if not self.buildstream_object:
assert self.type == _FileType.DIRECTORY
self.buildstream_object = CasBasedDirectory(
parent.cas_cache, digest=self.digest, parent=parent, filename=self.name
)
self.digest = None
return self.buildstream_object
def get_digest(self):
if self.buildstream_object:
# directory with buildstream object
return self.buildstream_object._get_digest()
else:
# regular file, symlink or directory without buildstream object
return self.digest
# clone():
#
# Create a deep copy of this object. If this is a directory, a
# CasBasedDirectory can also be passed to assign an appropriate
# parent directory.
#
def clone(self) -> "IndexEntry":
return IndexEntry(
self.name,
self.type,
# If this is a directory, the digest will be converted
# later if necessary. For other non-file types, digests
# are always None.
digest=self.get_digest(),
target=self.target,
is_executable=self.is_executable,
mtime=self.mtime,
)
def __eq__(self, other: object) -> bool:
if not isinstance(other, IndexEntry):
return NotImplemented
def get_equivalency_properties(e: IndexEntry):
return (e.name, e.type, e.target, e.is_executable, e.mtime, e.get_digest())
return get_equivalency_properties(self) == get_equivalency_properties(other)
# CasBasedDirectory intentionally doesn't call its superclass constuctor,
# which is meant to be unimplemented.
# pylint: disable=super-init-not-called
class CasBasedDirectory(Directory):
"""
CAS-based directories can have two names; one is a 'common name' which has no effect
on functionality, and the 'filename'. If a CasBasedDirectory has a parent, then 'filename'
must be the name of an entry in the parent directory's index which points to this object.
This is used to inform a parent directory that it must update the given hash for this
object when this object changes.
Typically a top-level CasBasedDirectory will have a common_name and no filename, and
subdirectories wil have a filename and no common_name. common_name can used to identify
CasBasedDirectory objects in a log file, since they have no unique position in a file
system.
"""
# Two constants which define the separators used by the remote execution API.
_pb2_path_sep = "/"
_pb2_absolute_path_prefix = "/"
def __init__(self, cas_cache, *, digest=None, parent=None, common_name="untitled", filename=None):
self.filename = filename
self.common_name = common_name
self.cas_cache = cas_cache
self.__digest = digest
self.index = {}
self.parent = parent
self.__subtree_read_only = None
if digest:
self._populate_index(digest)
# _clear():
#
# Remove all entries from this directory.
#
def _clear(self):
self.__invalidate_digest()
self.index = {}
# _reset():
#
# Replace the contents of this directory with the entries from the specified
# directory digest.
#
# Args:
# digest (Digest): The digest of the replacement directory
#
def _reset(self, *, digest=None):
self._clear()
if digest:
self.__digest = digest
self._populate_index(digest)
def _populate_index(self, digest):
try:
pb2_directory = remote_execution_pb2.Directory()
with open(self.cas_cache.objpath(digest), "rb") as f:
pb2_directory.ParseFromString(f.read())
except FileNotFoundError as e:
raise VirtualDirectoryError("Directory not found in local cache: {}".format(e)) from e
for prop in pb2_directory.node_properties.properties:
if prop.name == "SubtreeReadOnly":
self.__subtree_read_only = prop.value == "true"
for entry in pb2_directory.directories:
self.index[entry.name] = IndexEntry(entry.name, _FileType.DIRECTORY, digest=entry.digest)
for entry in pb2_directory.files:
if entry.node_properties.HasField("mtime"):
mtime = entry.node_properties.mtime
else:
mtime = None
self.index[entry.name] = IndexEntry(
entry.name,
_FileType.REGULAR_FILE,
digest=entry.digest,
is_executable=entry.is_executable,
mtime=mtime,
)
for entry in pb2_directory.symlinks:
self.index[entry.name] = IndexEntry(entry.name, _FileType.SYMLINK, target=entry.target)
def _find_self_in_parent(self):
assert self.parent is not None
parent = self.parent
for (k, v) in parent.index.items():
if v.buildstream_object == self:
return k
return None
def _add_directory(self, name):
assert name not in self.index
newdir = CasBasedDirectory(self.cas_cache, parent=self, filename=name)
self.index[name] = IndexEntry(name, _FileType.DIRECTORY, buildstream_object=newdir)
self.__invalidate_digest()
return newdir
def _add_file(self, name, path, modified=False, can_link=False, properties=None):
digest = self.cas_cache.add_object(path=path)
is_executable = os.access(path, os.X_OK)
mtime = None
if properties and "mtime" in properties:
mtime = timestamp_pb2.Timestamp()
utils._get_file_protobuf_mtimestamp(mtime, path)
entry = IndexEntry(
name,
_FileType.REGULAR_FILE,
digest=digest,
is_executable=is_executable,
modified=modified or name in self.index,
mtime=mtime,
)
self.index[name] = entry
self.__invalidate_digest()
def _add_entry(self, entry: IndexEntry):
self.index[entry.name] = entry.clone()
self.__invalidate_digest()
def _contains_entry(self, entry: IndexEntry) -> bool:
return entry == self.index.get(entry.name)
# _apply_changes():
#
# Apply changes from dir_a to dir_b to this directory. The use
# case for this is to merge changes between different workspace
# versions into a buildtree.
#
# If a change was made both to this directory, as well as between
# the given directories, it is applied, overwriting any changes to
# this directory. This is desirable because we want to keep user
# changes, however it may need to be re-considered for other use
# cases.
#
# We perform this computation this way, instead of with a _diff
# method and a subsequent _apply_diff, because it prevents leaking
# IndexEntry objects, which contain mutable references and may
# therefore cause problems if used outside of this class.
#
# Args:
# dir_a: The directory from which to start computing differences.
# dir_b: The directory whose changes to apply
#
def _apply_changes(self, dir_a: "CasBasedDirectory", dir_b: "CasBasedDirectory"):
# If the digests are the same, the directories are the same
# (child properties affect the digest). We can skip any work
# in such a case.
if dir_a._get_digest() == dir_b._get_digest():
return
def get_subdir(entry: IndexEntry, directory: CasBasedDirectory) -> CasBasedDirectory:
return directory.index[entry.name].get_directory(directory)
def is_dir_in(entry: IndexEntry, directory: CasBasedDirectory) -> bool:
return directory.index[entry.name].type == _FileType.DIRECTORY
# We first check which files were added, and add them to our
# directory.
for entry in dir_b.index.values():
if self._contains_entry(entry):
# We can short-circuit checking entries from b that
# already exist in our index.
continue
if not dir_a._contains_entry(entry):
if entry.name in self.index and is_dir_in(entry, self) and is_dir_in(entry, dir_b):
# If the entry changed, and is a directory in both
# the current and to-merge-into tree, we need to
# merge recursively.
# If the entry is not a directory in dir_a, we
# want to overwrite the file, but we need an empty
# directory for recursion.
if entry.name in dir_a.index and is_dir_in(entry, dir_a):
sub_a = get_subdir(entry, dir_a)
else:
sub_a = CasBasedDirectory(dir_a.cas_cache)
subdir = get_subdir(entry, self)
subdir._apply_changes(sub_a, get_subdir(entry, dir_b))
else:
# In any other case, we just add/overwrite the file/directory
self._add_entry(entry)
# We can't iterate and remove entries at the same time
to_remove = [entry for entry in dir_a.index.values() if entry.name not in dir_b.index]
for entry in to_remove:
self.remove(entry.name, recursive=True)
self.__invalidate_digest()
def _add_new_link_direct(self, name, target):
self.index[name] = IndexEntry(name, _FileType.SYMLINK, target=target, modified=name in self.index)
self.__invalidate_digest()
def remove(self, *path, recursive=False):
if len(path) > 1:
# Delegate remove to subdirectory
subdir = self.descend(*path[:-1])
subdir.remove(path[-1], recursive=recursive)
return
name = path[0]
self.__validate_path_component(name)
entry = self.index.get(name)
if not entry:
raise FileNotFoundError("{} not found in {}".format(name, str(self)))
if entry.type == _FileType.DIRECTORY and not recursive:
subdir = entry.get_directory(self)
if not subdir.is_empty():
raise VirtualDirectoryError("{} is not empty".format(str(subdir)))
del self.index[name]
self.__invalidate_digest()
def rename(self, src, dest):
srcdir = self.descend(*src[:-1])
entry = srcdir._entry_from_path(src[-1])
destdir = self.descend(*dest[:-1])
self.__validate_path_component(dest[-1])
srcdir.remove(src[-1], recursive=True)
entry.name = dest[-1]
destdir._add_entry(entry)
def descend(self, *paths, create=False, follow_symlinks=False):
"""Descend one or more levels of directory hierarchy and return a new
Directory object for that directory.
Arguments:
* *paths (str): A list of strings which are all directory names.
* create (boolean): If this is true, the directories will be created if
they don't already exist.
Note: At the moment, creating a directory by descending does
not update this object in the CAS cache. However, performing
an import_files() into a subdirectory of any depth obtained by
descending from this object *will* cause this directory to be
updated and stored.
"""
current_dir = self
paths = list(paths)
for path in paths:
# Skip empty path segments
if not path:
continue
self.__validate_path_component(path)
entry = current_dir.index.get(path)
if entry:
if entry.type == _FileType.DIRECTORY:
current_dir = entry.get_directory(current_dir)
elif follow_symlinks and entry.type == _FileType.SYMLINK:
linklocation = entry.target
newpaths = linklocation.split(os.path.sep)
if os.path.isabs(linklocation):
current_dir = current_dir._find_root().descend(*newpaths, follow_symlinks=True)
else:
current_dir = current_dir.descend(*newpaths, follow_symlinks=True)
else:
error = "Cannot descend into {}, which is a '{}' in the directory {}"
raise VirtualDirectoryError(
error.format(path, current_dir.index[path].type, current_dir), reason="not-a-directory"
)
else:
if path == ".":
continue
if path == "..":
if current_dir.parent is not None:
current_dir = current_dir.parent
# In POSIX /.. == / so just stay at the root dir
continue
if create:
current_dir = current_dir._add_directory(path)
else:
error = "'{}' not found in {}"
raise VirtualDirectoryError(error.format(path, str(current_dir)), reason="directory-not-found")
return current_dir
def _check_replacement(self, name, relative_pathname, fileListResult):
""" Checks whether 'name' exists, and if so, whether we can overwrite it.
If we can, add the name to 'overwritten_files' and delete the existing entry.
Returns 'True' if the import should go ahead.
fileListResult.overwritten and fileListResult.ignore are updated depending
on the result. """
existing_entry = self.index.get(name)
if existing_entry is None:
return True
elif existing_entry.type == _FileType.DIRECTORY:
# If 'name' maps to a DirectoryNode, then there must be an entry in index
# pointing to another Directory.
subdir = existing_entry.get_directory(self)
if subdir.is_empty():
self.remove(name)
fileListResult.overwritten.append(relative_pathname)
return True
else:
# We can't overwrite a non-empty directory, so we just ignore it.
fileListResult.ignored.append(relative_pathname)
return False
else:
self.remove(name)
fileListResult.overwritten.append(relative_pathname)
return True
def _partial_import_cas_into_cas(self, source_directory, filter_callback, *, path_prefix="", origin=None, result):
""" Import files from a CAS-based directory. """
if origin is None:
origin = self
for name, entry in source_directory.index.items():
# The destination filename, relative to the root where the import started
relative_pathname = os.path.join(path_prefix, name)
is_dir = entry.type == _FileType.DIRECTORY
if is_dir:
create_subdir = name not in self.index
if create_subdir and not filter_callback:
# If subdirectory does not exist yet and there is no filter,
# we can import the whole source directory by digest instead
# of importing each directory entry individually.
subdir_digest = entry.get_digest()
dest_entry = IndexEntry(name, _FileType.DIRECTORY, digest=subdir_digest)
self.index[name] = dest_entry
self.__invalidate_digest()
# However, we still need to iterate over the directory entries
# to fill in `result.files_written`.
# Use source subdirectory object if it already exists,
# otherwise create object for destination subdirectory.
# This is based on the assumption that the destination
# subdirectory is more likely to be modified later on
# (e.g., by further import_files() calls).
if entry.buildstream_object:
subdir = entry.buildstream_object
else:
subdir = dest_entry.get_directory(self)
subdir.__add_files_to_result(path_prefix=relative_pathname, result=result)
else:
src_subdir = source_directory.descend(name)
if src_subdir == origin:
continue
try:
dest_subdir = self.descend(name, create=create_subdir)
except VirtualDirectoryError:
filetype = self.index[name].type
raise VirtualDirectoryError(
"Destination is a {}, not a directory: /{}".format(filetype, relative_pathname)
)
dest_subdir._partial_import_cas_into_cas(
src_subdir, filter_callback, path_prefix=relative_pathname, origin=origin, result=result
)
if filter_callback and not filter_callback(relative_pathname):
if is_dir and create_subdir and dest_subdir.is_empty():
# Complete subdirectory has been filtered out, remove it
self.remove(name)
# Entry filtered out, move to next
continue
if not is_dir:
if self._check_replacement(name, relative_pathname, result):
if entry.type == _FileType.REGULAR_FILE:
self._add_entry(entry)
self.index[entry.name].modified = True
else:
assert entry.type == _FileType.SYMLINK
self._add_new_link_direct(name=name, target=entry.target)
result.files_written.append(relative_pathname)
def import_files(
self,
external_pathspec,
*,
filter_callback=None,
report_written=True,
update_mtime=None,
can_link=False,
properties=None
):
""" See superclass Directory for arguments """
result = FileListResult()
if isinstance(external_pathspec, FileBasedDirectory):
external_pathspec = external_pathspec._get_underlying_directory()
if isinstance(external_pathspec, str):
# Import files from local filesystem by first importing complete
# directory into CAS (using buildbox-casd) and then importing its
# content into this CasBasedDirectory using CAS-to-CAS import
# to write the report, handle possible conflicts (if the target
# directory is not empty) and apply the optional filter.
digest = self.cas_cache.import_directory(external_pathspec, properties=properties)
external_pathspec = CasBasedDirectory(self.cas_cache, digest=digest)
assert isinstance(external_pathspec, CasBasedDirectory)
self._partial_import_cas_into_cas(external_pathspec, filter_callback, result=result)
# TODO: No notice is taken of report_written or update_mtime.
# Current behaviour is to fully populate the report, which is inefficient,
# but still correct.
return result
def import_single_file(self, external_pathspec, properties=None):
result = FileListResult()
if self._check_replacement(os.path.basename(external_pathspec), os.path.dirname(external_pathspec), result):
self._add_file(
os.path.basename(external_pathspec),
external_pathspec,
modified=os.path.basename(external_pathspec) in result.overwritten,
properties=properties,
)
result.files_written.append(external_pathspec)
return result
def set_deterministic_user(self):
""" Sets all files in this directory to the current user's euid/egid.
We also don't store user data, so this can be ignored.
"""
def export_files(self, to_directory, *, can_link=False, can_destroy=False):
"""Copies everything from this into to_directory, which must be the name
of a traditional filesystem directory.
Arguments:
to_directory (string): a path outside this directory object
where the contents will be copied to.
can_link (bool): Whether we can create hard links in to_directory
instead of copying.
can_destroy (bool): Whether we can destroy elements in this
directory to export them (e.g. by renaming them as the
target).
"""
self.cas_cache.checkout(to_directory, self._get_digest(), can_link=can_link)
def export_to_tar(self, tarfile, destination_dir, mtime=BST_ARBITRARY_TIMESTAMP):
for filename, entry in sorted(self.index.items()):
arcname = os.path.join(destination_dir, filename)
if entry.type == _FileType.DIRECTORY:
tarinfo = tarfilelib.TarInfo(arcname)
tarinfo.mtime = mtime
tarinfo.type = tarfilelib.DIRTYPE
tarinfo.mode = 0o755
tarfile.addfile(tarinfo)
self.descend(filename).export_to_tar(tarfile, arcname, mtime)
elif entry.type == _FileType.REGULAR_FILE:
source_name = self.cas_cache.objpath(entry.digest)
tarinfo = tarfilelib.TarInfo(arcname)
tarinfo.mtime = mtime
if entry.is_executable:
tarinfo.mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
tarinfo.size = os.path.getsize(source_name)
with open(source_name, "rb") as f:
tarfile.addfile(tarinfo, f)
elif entry.type == _FileType.SYMLINK:
tarinfo = tarfilelib.TarInfo(arcname)
tarinfo.mtime = mtime
tarinfo.mode = 0o777
tarinfo.linkname = entry.target
tarinfo.type = tarfilelib.SYMTYPE
f = StringIO(entry.target)
tarfile.addfile(tarinfo, f)
else:
raise VirtualDirectoryError("can not export file type {} to tar".format(entry.type))
def _mark_changed(self):
""" It should not be possible to externally modify a CAS-based
directory at the moment."""
raise NotImplementedError()
def is_empty(self):
""" Return true if this directory has no files, subdirectories or links in it.
"""
return len(self.index) == 0
def _mark_directory_unmodified(self):
# Marks all entries in this directory and all child directories as unmodified.
for i in self.index.values():
i.modified = False
if i.type == _FileType.DIRECTORY and i.buildstream_object:
i.buildstream_object._mark_directory_unmodified()
def _mark_entry_unmodified(self, name):
# Marks an entry as unmodified. If the entry is a directory, it will
# recursively mark all its tree as unmodified.
self.index[name].modified = False
if self.index[name].buildstream_object:
self.index[name].buildstream_object._mark_directory_unmodified()
def mark_unmodified(self):
""" Marks all files in this directory (recursively) as unmodified.
If we have a parent, we mark our own entry as unmodified in that parent's
index.
"""
if self.parent:
self.parent._mark_entry_unmodified(self._find_self_in_parent())
else:
self._mark_directory_unmodified()
def _lightweight_resolve_to_index(self, path):
"""A lightweight function for transforming paths into IndexEntry
objects. This does not follow symlinks.
path: The string to resolve. This should be a series of path
components separated by the protocol buffer path separator
_pb2_path_sep.
Returns: the IndexEntry found, or None if any of the path components were not present.
"""
directory = self
path_components = path.split(CasBasedDirectory._pb2_path_sep)
for component in path_components[:-1]:
if component not in directory.index:
return None
if directory.index[component].type == _FileType.DIRECTORY:
directory = directory.index[component].get_directory(self)
else:
return None
return directory.index.get(path_components[-1], None)
def list_modified_paths(self):
"""Provide a list of relative paths which have been modified since the
last call to mark_unmodified.
Return value: List(str) - list of modified paths
"""
for p in self.list_relative_paths():
i = self._lightweight_resolve_to_index(p)
if i and i.modified:
yield p
def list_relative_paths(self):
"""Provide a list of all relative paths.
Yields:
(List(str)) - list of all files with relative paths.
"""
yield from self._list_prefixed_relative_paths()
def _list_prefixed_relative_paths(self, prefix=""):
"""Provide a list of all relative paths.
Arguments:
prefix (str): an optional prefix to the relative paths, this is
also emitted by itself.
Yields:
(List(str)) - list of all files with relative paths.
"""
file_list = list(filter(lambda i: i[1].type != _FileType.DIRECTORY, self.index.items()))
directory_list = filter(lambda i: i[1].type == _FileType.DIRECTORY, self.index.items())
if prefix != "":
yield prefix
for (k, v) in sorted(file_list):
yield os.path.join(prefix, k)
for (k, v) in sorted(directory_list):
subdir = v.get_directory(self)
yield from subdir._list_prefixed_relative_paths(prefix=os.path.join(prefix, k))
def walk(self):
"""Provide a list of dictionaries containing information about the files.
Yields:
info (dict) - a dictionary containing name, type and size of the files.
"""
yield from self._walk()
def _walk(self, prefix=""):
""" Walk through the files, collecting the required data
Arguments:
prefix (str): an optional prefix to the relative paths, this is
also emitted by itself.
Yields:
info (dict) - a dictionary containing name, type and size of the files.
"""
for leaf in sorted(self.index.keys()):
entry = self.index[leaf]
info = {"name": os.path.join(prefix, leaf), "type": entry.type}
if entry.type == _FileType.REGULAR_FILE:
info["executable"] = entry.is_executable
info["size"] = self.get_size()
elif entry.type == _FileType.SYMLINK:
info["target"] = entry.target
info["size"] = len(entry.target)
if entry.type == _FileType.DIRECTORY:
directory = entry.get_directory(self)
info["size"] = len(directory.index)
yield info
yield from directory._walk(os.path.join(prefix, leaf))
else:
yield info
def get_size(self):
digest = self._get_digest()
total = digest.size_bytes
for i in self.index.values():
if i.type == _FileType.DIRECTORY:
subdir = i.get_directory(self)
total += subdir.get_size()
elif i.type == _FileType.REGULAR_FILE:
total += i.digest.size_bytes
# Symlink nodes are encoded as part of the directory serialization.
return total
def _get_identifier(self):
path = ""
if self.parent:
path = self.parent._get_identifier()
if self.filename:
path += "/" + self.filename
else:
path += "/" + self.common_name
return path
@contextmanager
def open_file(self, *path: str, mode: str = "r"):
subdir = self.descend(*path[:-1])
self.__validate_path_component(path[-1])
entry = subdir.index.get(path[-1])
if entry and entry.type != _FileType.REGULAR_FILE:
raise VirtualDirectoryError("{} in {} is not a file".format(path[-1], str(subdir)))
if mode not in ["r", "rb", "w", "wb", "w+", "w+b", "x", "xb", "x+", "x+b"]:
raise ValueError("Unsupported mode: `{}`".format(mode))
if "b" in mode:
encoding = None
else:
encoding = "utf-8"
if "r" in mode:
if not entry:
raise FileNotFoundError("{} not found in {}".format(path[-1], str(subdir)))
# Read-only access, allow direct access to CAS object
with open(self.cas_cache.objpath(entry.digest), mode, encoding=encoding) as f:
yield f
else:
if "x" in mode and entry:
raise FileExistsError("{} already exists in {}".format(path[-1], str(subdir)))
with utils._tempnamedfile(mode, encoding=encoding, dir=self.cas_cache.tmpdir) as f:
# Make sure the temporary file is readable by buildbox-casd
os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
yield f
# Import written temporary file into CAS
f.flush()
subdir._add_file(path[-1], f.name, modified=True)
def __str__(self):
return "[CAS:{}]".format(self._get_identifier())
def _get_underlying_directory(self):
""" There is no underlying directory for a CAS-backed directory, so
throw an exception. """
raise VirtualDirectoryError(
"_get_underlying_directory was called on a CAS-backed directory," + " which has no underlying directory."
)
def _find_root(self):
""" Finds the root of this directory tree by following 'parent' until there is
no parent. """
if self.parent:
return self.parent._find_root()
else:
return self
# _get_digest():
#
# Return the Digest for this directory.
#
# Returns:
# (Digest): The Digest protobuf object for the Directory protobuf
#
def _get_digest(self):
if not self.__digest:
# Create updated Directory proto
pb2_directory = remote_execution_pb2.Directory()
if self.__subtree_read_only is not None:
node_property = pb2_directory.node_properties.properties.add()
node_property.name = "SubtreeReadOnly"
node_property.value = "true" if self.__subtree_read_only else "false"
for name, entry in sorted(self.index.items()):
if entry.type == _FileType.DIRECTORY:
dirnode = pb2_directory.directories.add()
dirnode.name = name
# Update digests for subdirectories in DirectoryNodes.
# No need to call entry.get_directory().
# If it hasn't been instantiated, digest must be up-to-date.
subdir = entry.buildstream_object
if subdir:
dirnode.digest.CopyFrom(subdir._get_digest())
else:
dirnode.digest.CopyFrom(entry.digest)
elif entry.type == _FileType.REGULAR_FILE:
filenode = pb2_directory.files.add()
filenode.name = name
filenode.digest.CopyFrom(entry.digest)
filenode.is_executable = entry.is_executable
if entry.mtime is not None:
filenode.node_properties.mtime.CopyFrom(entry.mtime)
elif entry.type == _FileType.SYMLINK:
symlinknode = pb2_directory.symlinks.add()
symlinknode.name = name
symlinknode.target = entry.target
self.__digest = self.cas_cache.add_object(buffer=pb2_directory.SerializeToString())
return self.__digest
def _entry_from_path(self, *path, follow_symlinks=False):
subdir = self.descend(*path[:-1], follow_symlinks=follow_symlinks)
self.__validate_path_component(path[-1])
target = subdir.index.get(path[-1])
if target is None:
raise FileNotFoundError("{} not found in {}".format(path[-1], str(subdir)))
if follow_symlinks and target.type == _FileType.SYMLINK:
linklocation = target.target
newpath = linklocation.split(os.path.sep)
if os.path.isabs(linklocation):
return subdir._find_root()._entry_from_path(*newpath, follow_symlinks=True)
return subdir._entry_from_path(*newpath, follow_symlinks=True)
else:
return target
def exists(self, *path, follow_symlinks=False):
try:
self._entry_from_path(*path, follow_symlinks=follow_symlinks)
return True
except (VirtualDirectoryError, FileNotFoundError):
return False
def stat(self, *path, follow_symlinks=False):
entry = self._entry_from_path(*path, follow_symlinks=follow_symlinks)
st_mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
st_nlink = 1
st_mtime = BST_ARBITRARY_TIMESTAMP
if entry.type == _FileType.REGULAR_FILE:
st_mode |= stat.S_IFREG
st_size = entry.get_digest().size_bytes
elif entry.type == _FileType.DIRECTORY:
st_mode |= stat.S_IFDIR
st_size = 0
elif entry.type == _FileType.SYMLINK:
st_mode |= stat.S_IFLNK
st_size = len(entry.target)
else:
raise VirtualDirectoryError("Unsupported file type {}".format(entry.type))
if entry.type == _FileType.DIRECTORY or entry.is_executable:
st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
if entry.mtime is not None:
st_mtime = utils._parse_protobuf_timestamp(entry.mtime)
return os.stat_result((st_mode, 0, 0, st_nlink, 0, 0, st_size, st_mtime, st_mtime, st_mtime))
def file_digest(self, *path):
entry = self._entry_from_path(*path)
if entry.type != _FileType.REGULAR_FILE:
raise VirtualDirectoryError("Unsupported file type for digest: {}".format(entry.type))
return entry.digest.hash
def readlink(self, *path):
entry = self._entry_from_path(*path)
if entry.type != _FileType.SYMLINK:
raise VirtualDirectoryError("Unsupported file type for readlink: {}".format(entry.type))
return entry.target
def __iter__(self):
yield from self.index.keys()
def _set_subtree_read_only(self, read_only):
self.__subtree_read_only = read_only
self.__invalidate_digest()
def __invalidate_digest(self):
if self.__digest:
self.__digest = None
if self.parent:
self.parent.__invalidate_digest()
def __add_files_to_result(self, *, path_prefix="", result):
for name, entry in self.index.items():
# The destination filename, relative to the root where the import started
relative_pathname = os.path.join(path_prefix, name)
if entry.type == _FileType.DIRECTORY:
subdir = self.descend(name)
subdir.__add_files_to_result(path_prefix=relative_pathname, result=result)
else:
result.files_written.append(relative_pathname)
def __validate_path_component(self, path):
if "/" in path:
raise VirtualDirectoryError("Invalid path component: '{}'".format(path))