blob: cf234eaca36e8bed4bd4cc9e1b8aad5c1504e521 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2018 Bloomberg Finance LP
# Copyright (C) 2022 Codethink Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# Jim MacArthur <jim.macarthur@codethink.co.uk>
# Tristan van Berkom <tristan.vanberkom@codethink.co.uk>
import os
import shutil
import stat
from contextlib import contextmanager
from tarfile import TarFile
from typing import Callable, Optional, Union, List, IO, Iterator
from .directory import Directory, DirectoryError, FileType, FileStat
from .. import utils
from ..utils import BST_ARBITRARY_TIMESTAMP
from ..utils import FileListResult
# FileBasedDirectory intentionally doesn't call its superclass constuctor,
# which is meant to be unimplemented.
# pylint: disable=super-init-not-called
class FileBasedDirectory(Directory):
def __init__(self, external_directory: str, *, parent: Optional["FileBasedDirectory"] = None) -> None:
self.__external_directory: str = external_directory
self.__parent: Optional[FileBasedDirectory] = parent
def __iter__(self) -> Iterator[str]:
yield from os.listdir(self.__external_directory)
def __len__(self) -> int:
entries = list(os.listdir(self.__external_directory))
return len(entries)
def __str__(self) -> str:
# This returns the whole path (since we don't know where the directory started)
# which exposes the sandbox directory; we will have to assume for the time being
# that people will not abuse __str__.
return self.__external_directory
#############################################################
# Implementation of Public API #
#############################################################
def open_directory(
self, path: str, *, create: bool = False, follow_symlinks: bool = False
) -> "FileBasedDirectory":
self._validate_path(path)
paths = path.split("/")
return self.__open_directory(paths, create=create, follow_symlinks=follow_symlinks)
def import_single_file(self, external_pathspec: str) -> FileListResult:
dstpath = os.path.join(self.__external_directory, os.path.basename(external_pathspec))
result = FileListResult()
if os.path.exists(dstpath):
result.ignored.append(dstpath)
else:
shutil.copyfile(external_pathspec, dstpath, follow_symlinks=False)
return result
# Add a directory entry deterministically to a tar file
#
# This function takes extra steps to ensure the output is deterministic.
# First, it sorts the results of os.listdir() to ensure the ordering of
# the files in the archive is the same. Second, it sets a fixed
# timestamp for each entry. See also https://bugs.python.org/issue24465.
def export_to_tar(self, tarfile: TarFile, destination_dir: str, mtime: int = BST_ARBITRARY_TIMESTAMP) -> None:
# We need directories here, including non-empty ones,
# so list_relative_paths is not used.
for filename in sorted(os.listdir(self.__external_directory)):
source_name = os.path.join(self.__external_directory, filename)
arcname = os.path.join(destination_dir, filename)
tarinfo = tarfile.gettarinfo(source_name, arcname)
tarinfo.mtime = mtime
tarinfo.uid = 0
tarinfo.gid = 0
tarinfo.uname = ""
tarinfo.gname = ""
if tarinfo.isreg():
with open(source_name, "rb") as f:
tarfile.addfile(tarinfo, f)
elif tarinfo.isdir():
tarfile.addfile(tarinfo)
self.open_directory(filename).export_to_tar(tarfile, arcname, mtime)
else:
tarfile.addfile(tarinfo)
def list_relative_paths(self) -> Iterator[str]:
yield from utils.list_relative_paths(self.__external_directory)
def exists(self, path: str, *, follow_symlinks: bool = False) -> bool:
try:
self.stat(path, follow_symlinks=follow_symlinks)
return True
except DirectoryError:
return False
def stat(self, path: str, *, follow_symlinks: bool = False) -> FileStat:
self._validate_path(path)
paths = path.split("/")
subdir = self.__open_directory(paths[:-1], follow_symlinks=follow_symlinks)
newpath = os.path.join(subdir.__external_directory, paths[-1])
try:
st = os.lstat(newpath)
except OSError as e:
raise DirectoryError("Error accessing path '{}': {}".format(newpath, e)) from e
if follow_symlinks and stat.S_ISLNK(st.st_mode):
linklocation = os.readlink(newpath)
if os.path.isabs(linklocation):
# Call stat on the root, remove the leading "/"
return subdir.__find_root().stat(linklocation[1:], follow_symlinks=True)
return subdir.stat(linklocation, follow_symlinks=True)
else:
return self.__convert_filestat(st)
@contextmanager
def open_file(self, path: str, *, mode: str = "r") -> Iterator[IO]:
self._validate_path(path)
paths = path.split("/")
# Use open_directory() to avoid following symlinks (potentially escaping the sandbox)
subdir = self.__open_directory(paths[:-1])
newpath = os.path.join(subdir.__external_directory, paths[-1])
if mode not in ["r", "rb", "w", "wb", "w+", "w+b", "x", "xb", "x+", "x+b"]:
raise ValueError("Unsupported mode: `{}`".format(mode))
if "b" in mode:
encoding = None
else:
encoding = "utf-8"
if "r" in mode:
with open(newpath, mode=mode, encoding=encoding) as f:
yield f
else:
if "x" in mode:
# This check is not atomic, however, we're operating with a
# single thread in a private directory tree.
if subdir.exists(path[-1]):
raise DirectoryError("{} already exists in {}".format(path[-1], str(subdir)))
mode = "w" + mode[1:]
with utils.save_file_atomic(newpath, mode=mode, encoding=encoding) as f:
yield f
def file_digest(self, path: str) -> str:
self._validate_path(path)
paths = path.split("/")
# Use open_directory() to avoid following symlinks (potentially escaping the sandbox)
subdir = self.__open_directory(paths[:-1])
if subdir.exists(paths[-1]) and not subdir.isfile(paths[-1]):
raise DirectoryError("Unsupported file type for digest")
newpath = os.path.join(subdir.__external_directory, paths[-1])
return utils.sha256sum(newpath)
def readlink(self, path: str) -> str:
self._validate_path(path)
paths = path.split("/")
# Use open_directory() to avoid following symlinks (potentially escaping the sandbox)
subdir = self.__open_directory(paths[:-1])
if subdir.exists(paths[-1]) and not subdir.islink(paths[-1]):
raise DirectoryError("Unsupported file type for readlink")
newpath = os.path.join(subdir.__external_directory, paths[-1])
return os.readlink(newpath)
def remove(self, path: str, *, recursive: bool = False) -> None:
self._validate_path(path)
paths = path.split("/")
# Use open_directory() to avoid following symlinks (potentially escaping the sandbox)
subdir = self.__open_directory(paths[:-1])
newpath = os.path.join(subdir.__external_directory, paths[-1])
if subdir.isdir(paths[-1]):
if recursive:
shutil.rmtree(newpath)
else:
try:
os.rmdir(newpath)
except OSError as e:
raise DirectoryError("Error removing '{}': {}".format(newpath, e))
else:
try:
os.unlink(newpath)
except OSError as e:
raise DirectoryError("Error removing '{}': {}".format(newpath, e))
def rename(self, src: str, dest: str) -> None:
self._validate_path(src)
self._validate_path(dest)
src_paths = src.split("/")
dest_paths = dest.split("/")
# Use open_directory() to avoid following symlinks (potentially escaping the sandbox)
srcdir = self.__open_directory(src_paths[:-1])
destdir = self.__open_directory(dest_paths[:-1])
srcpath = os.path.join(srcdir.__external_directory, src_paths[-1])
destpath = os.path.join(destdir.__external_directory, dest_paths[-1])
if destdir.exists(dest_paths[-1]):
destdir.remove(dest_paths[-1])
try:
os.rename(srcpath, destpath)
except OSError as e:
raise DirectoryError("Error renaming '{}' -> '{}': {}".format(srcpath, destpath, e))
#############################################################
# Implementation of Internal API #
#############################################################
def _import_files(
self,
external_pathspec: Union[Directory, str],
*,
filter_callback: Optional[Callable[[str], bool]] = None,
update_mtime: Optional[float] = None,
properties: Optional[List[str]] = None,
collect_result: bool = True
) -> FileListResult:
# See if we can get a source directory to copy from
source_directory: Optional[str] = None
if isinstance(external_pathspec, str):
source_directory = external_pathspec
elif isinstance(external_pathspec, Directory):
try:
source_directory = external_pathspec._get_underlying_directory()
except DirectoryError:
pass
if source_directory:
#
# We've got a source directory to copy from
#
import_result = utils.copy_files(
source_directory,
self.__external_directory,
filter_callback=filter_callback,
ignore_missing=False,
report_written=True,
)
if update_mtime:
for f in import_result.files_written:
os.utime(os.path.join(self.__external_directory, f), times=(update_mtime, update_mtime))
else:
#
# We're dealing with an abstract Directory object
#
assert isinstance(external_pathspec, Directory)
def copy_action(src_path, dest_path, mtime, result):
utils.safe_copy(src_path, dest_path, result=result)
utils._set_file_mtime(dest_path, mtime)
import_result = FileListResult()
self.__import_files_from_directory(
external_pathspec,
copy_action,
filter_callback,
update_mtime=update_mtime,
result=import_result,
)
return import_result
def _export_files(self, to_directory: str, *, can_link: bool = False, can_destroy: bool = False) -> None:
if can_destroy:
# Try a simple rename of the sandbox root; if that
# doesnt cut it, then do the regular link files code path
try:
os.rename(self.__external_directory, to_directory)
return
except OSError:
# Proceed using normal link/copy
pass
os.makedirs(to_directory, exist_ok=True)
if can_link:
utils.link_files(self.__external_directory, to_directory)
else:
utils.copy_files(self.__external_directory, to_directory)
def _set_deterministic_user(self) -> None:
utils._set_deterministic_user(self.__external_directory)
def _get_underlying_path(self, filename) -> str:
return os.path.join(self.__external_directory, filename)
def _get_underlying_directory(self) -> str:
return self.__external_directory
def _get_size(self) -> int:
return utils._get_dir_size(self.__external_directory)
#############################################################
# Private methods #
#############################################################
def __open_directory(
self, paths: List[str], *, create: bool = False, follow_symlinks: bool = False
) -> "FileBasedDirectory":
current_dir = self
for path in paths:
# Skip empty path segments
if not path:
continue
if path == ".":
continue
if path == "..":
if current_dir.__parent is not None:
current_dir = current_dir.__parent
# In POSIX /.. == / so just stay at the root dir
continue
new_path = os.path.join(current_dir.__external_directory, path)
try:
st = os.lstat(new_path)
if stat.S_ISDIR(st.st_mode):
current_dir = FileBasedDirectory(new_path, parent=current_dir)
elif follow_symlinks and stat.S_ISLNK(st.st_mode):
linklocation = os.readlink(new_path)
newpaths = linklocation.split(os.path.sep)
if os.path.isabs(linklocation):
current_dir = current_dir.__find_root().__open_directory(newpaths, follow_symlinks=True)
else:
current_dir = current_dir.__open_directory(newpaths, follow_symlinks=True)
else:
raise DirectoryError(
"Cannot open '{}': '{}' is not a directory".format(path, new_path),
reason="not-a-directory",
)
except FileNotFoundError:
if create:
os.mkdir(new_path)
current_dir = FileBasedDirectory(new_path, parent=current_dir)
else:
raise DirectoryError("Cannot open '{}': '{}' does not exist".format(path, new_path))
return current_dir
# __convert_filestat()
#
# Convert an os.stat_result into a FileStat
#
def __convert_filestat(self, st: os.stat_result) -> FileStat:
file_type: int = 0
if stat.S_ISREG(st.st_mode):
file_type = FileType.REGULAR_FILE
elif stat.S_ISDIR(st.st_mode):
file_type = FileType.DIRECTORY
elif stat.S_ISLNK(st.st_mode):
file_type = FileType.SYMLINK
# If any of the executable bits are set, lets call it executable
executable = bool(st.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
return FileStat(file_type, executable=executable, size=st.st_size, mtime=st.st_mtime)
# __find_root()
#
# Finds the root of this directory tree by following 'parent' until there is
# no parent.
#
def __find_root(self) -> "FileBasedDirectory":
if self.__parent:
return self.__parent.__find_root()
else:
return self
# __import_files_from_directory()
#
# Import files from a CAS-based directory.
#
def __import_files_from_directory(
self,
source_directory: Directory,
actionfunc: Callable[[str, str, float, FileListResult], None],
filter_callback: Optional[Callable[[str], bool]] = None,
*,
path_prefix: str = "",
update_mtime: Optional[float] = None,
result: FileListResult
) -> None:
# Iterate over entries in the source directory
for name in source_directory:
# The destination filename, relative to the root where the import started
relative_pathname = os.path.join(path_prefix, name)
# The full destination path
dest_path = os.path.join(self.__external_directory, name)
is_dir = source_directory.isdir(name)
if is_dir:
src_subdir = source_directory.open_directory(name)
try:
create_subdir = not os.path.lexists(dest_path)
dest_subdir = self.open_directory(name, create=create_subdir)
except DirectoryError:
raise DirectoryError("Destination is not a directory: /{}".format(relative_pathname))
dest_subdir.__import_files_from_directory(
src_subdir,
actionfunc,
filter_callback,
path_prefix=relative_pathname,
result=result,
update_mtime=update_mtime,
)
if filter_callback and not filter_callback(relative_pathname):
if is_dir and create_subdir and not dest_subdir:
# Complete subdirectory has been filtered out, remove it
os.rmdir(dest_subdir.__external_directory)
# Filename filtered out, move to next
continue
if not is_dir:
if os.path.lexists(dest_path):
# Collect overlaps
if not os.path.isdir(dest_path):
result.overwritten.append(relative_pathname)
if not utils.safe_remove(dest_path):
result.ignored.append(relative_pathname)
continue
if source_directory.isfile(name):
src_path = source_directory._get_underlying_path(name)
filestat = source_directory.stat(name)
# hardlink files on request, they wont have their mtimes
# updated if hardlinking is performed
mtime = update_mtime
if mtime is None:
mtime = filestat.mtime
actionfunc(src_path, dest_path, mtime, result)
if filestat.executable:
os.chmod(
dest_path,
stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IXOTH,
)
result.files_written.append(relative_pathname)
elif source_directory.islink(name):
link_target = source_directory.readlink(name)
os.symlink(link_target, dest_path)
result.files_written.append(relative_pathname)