blob: 549e12e484a056df379b5a7ac678fb267c19399d [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from contextlib import contextmanager
from typing import TYPE_CHECKING, Iterator
from . import _cachekey
from ._exceptions import SkipJob
from ._context import Context
from ._protos.buildstream.v2 import source_pb2
from .plugin import Plugin
from .storage._casbaseddirectory import CasBasedDirectory
if TYPE_CHECKING:
from typing import List
# pylint: disable=cyclic-import
from .source import Source
from ._project import Project
# pylint: enable=cyclic-import
# An ElementSources object represents the combined sources of an element.
class ElementSources:
def __init__(self, context: Context, project: "Project", plugin: Plugin):
self._context = context
self._project = project
self._plugin = plugin
self._sources = [] # type: List[Source]
self._sourcecache = context.sourcecache # Source cache
self._elementsourcescache = context.elementsourcescache # Cache of staged element sources
self._is_resolved = False # Whether the source is fully resolved or not
self._cached = None # If the sources are known to be successfully cached in CAS
self._cache_key = None # Our cached cache key
self._proto = None # The cached Source proto
# get_project():
#
# Return the project associated with this object
#
def get_project(self):
return self._project
# add_source():
#
# Append source to this list of element sources.
#
# Args:
# source (Source): The source to add
#
def add_source(self, source):
self._sources.append(source)
# sources():
#
# A generator function to enumerate the element sources
#
# Yields:
# Source: The individual sources
#
def sources(self) -> Iterator["Source"]:
for source in self._sources:
yield source
# track():
#
# Calls track() on the Element sources
#
# Raises:
# SourceError: If one of the element sources has an error
#
# Returns:
# (list): A list of Source object ids and their new references
#
def track(self, workspace):
refs = []
for source in self._sources:
old_ref = source.get_ref()
if source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK:
with self._stage_previous_sources(source) as staging_directory:
new_ref = source._track(previous_sources_dir=staging_directory)
else:
new_ref = source._track()
refs.append((source._unique_id, new_ref, old_ref != new_ref))
# Complimentary warning that the new ref will be unused.
if old_ref != new_ref and workspace:
detail = (
"This source has an open workspace.\n"
+ "To start using the new reference, please close the existing workspace."
)
source.warn("Updated reference will be ignored as source has open workspace", detail=detail)
# Sources which do not implement track() will return None, produce
# a SKIP message in the UI if all sources produce None
#
if all(ref is None for _, ref, _ in refs):
raise SkipJob("Element sources are not trackable")
return refs
# stage_and_cache():
#
# Stage the element sources to a directory in CAS
#
def stage_and_cache(self):
vdir = self._stage()
source_proto = source_pb2.Source()
source_proto.files.CopyFrom(vdir._get_digest())
self._elementsourcescache.store_proto(self, source_proto)
self._proto = source_proto
self._cached = True
# get_files():
#
# Get a virtual directory for the staged source files
#
# Returns:
# (Directory): The virtual directory object
#
def get_files(self):
# Assert sources are cached
assert self.cached()
cas = self._context.get_cascache()
return CasBasedDirectory(cas, digest=self._proto.files)
# fetch_done()
#
# Indicates that fetching the sources for this element has been done.
#
# Args:
# fetched_original (bool): Whether the original sources had been asked (and fetched) or not
#
def fetch_done(self, fetched_original):
self._proto = self._elementsourcescache.load_proto(self)
assert self._proto
self._cached = True
for source in self._sources:
source._fetch_done(fetched_original)
# push()
#
# Push the element's sources.
#
# Returns:
# (bool): True if the remote was updated, False if it already existed
# and no updated was required
#
def push(self):
pushed = False
for source in self.sources():
if source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH or source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
continue
if self._sourcecache.contains(source) and self._sourcecache.push(source):
pushed = True
if self._elementsourcescache.push(self, self._plugin):
pushed = True
return pushed
# init_workspace():
#
# Initialises a new workspace from the element sources.
#
# Args:
# directory (str): Path of the workspace to init
#
def init_workspace(self, directory: str):
for source in self.sources():
if source._directory:
srcdir = os.path.join(directory, source._directory)
else:
srcdir = directory
os.makedirs(srcdir, exist_ok=True)
source._init_workspace(srcdir)
# fetch():
#
# Fetch the combined or individual element sources.
#
# Raises:
# SourceError: If one of the element sources has an error
#
def fetch(self):
if self._cached is None:
self.query_cache()
if self.cached():
return
# Try to fetch staged sources from remote source cache
if self._elementsourcescache.has_fetch_remotes() and self._elementsourcescache.pull(self, self._plugin):
self.fetch_done(False)
return
# Otherwise, fetch individual sources
self.fetch_sources()
# fetch_sources():
#
# Fetch the individual element sources.
#
# Args:
# fetch_original (bool): Always fetch original source
# stop (Source): Only fetch sources listed before this source
#
# Raises:
# SourceError: If one of the element sources has an error
#
def fetch_sources(self, *, fetch_original=False, stop=None):
for source in self._sources:
if source == stop:
break
if (
fetch_original
or source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH
or source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE
):
# Source depends on previous sources, it cannot be stored in
# CAS-based source cache on its own. Fetch original source
# if it's not in the plugin-specific cache yet.
if not source._is_cached():
self._fetch_original_source(source)
else:
self._fetch_source(source)
# get_unique_key():
#
# Return something which uniquely identifies the combined sources of the
# element.
#
# Returns:
# (str, list, dict): A string, list or dictionary as unique identifier
#
def get_unique_key(self):
assert self.is_resolved()
result = []
for source in self._sources:
key_dict = {"key": source._get_unique_key(), "name": source.get_kind()}
if source._directory:
key_dict["directory"] = source._directory
result.append(key_dict)
return result
# get_cache_key():
#
# Return cache key for the combined element sources
#
def get_cache_key(self):
return self._cache_key
# get_brief_display_key()
#
# Returns an abbreviated cache key for display purposes
#
# Returns:
# (str): An abbreviated hex digest cache key for this Element
#
def get_brief_display_key(self):
context = self._context
key = self._cache_key
length = min(len(key), context.log_key_length)
return key[:length]
# query_cache():
#
# Check if the element sources are cached in CAS, generating the source
# cache keys if needed.
#
# Returns:
# (bool): True if the element sources are cached
#
def query_cache(self):
cas = self._context.get_cascache()
elementsourcescache = self._elementsourcescache
source_proto = elementsourcescache.load_proto(self)
if not source_proto:
self._cached = False
return False
if not cas.contains_directory(source_proto.files, with_files=True):
self._cached = False
return False
self._proto = source_proto
self._cached = True
return True
# can_query_cache():
#
# Returns whether the cache status is available.
#
# Returns:
# (bool): True if cache status is available
#
def can_query_cache(self):
return self._cached is not None
# cached()
#
# Return whether the element sources are cached in CAS. This must be
# called only when all sources are resolved.
#
# Returns:
# (bool): True if the element sources are cached
#
def cached(self):
assert self._cached is not None
return self._cached
# is_resolved():
#
# Get whether all sources of the element are resolved
#
# Returns:
# (bool): True if all element sources are resolved
#
def is_resolved(self):
return self._is_resolved
# cached_original():
#
# Get whether all the sources of the element have their own cached
# copy of their sources.
#
# Returns:
# (bool): True if all element sources have the original sources cached
#
def cached_original(self):
return all(source._is_cached() for source in self._sources)
# update_resolved_state():
#
# Updates source's resolved state
#
# An element's source state must be resolved before it may compute
# cache keys, because the source's ref, whether defined in yaml or
# from the workspace, is a component of the element's cache keys.
#
def update_resolved_state(self):
if self._is_resolved:
# Already resolved
return
for source in self._sources:
if not source.is_resolved():
return
# Source is resolved, generate its cache key
source._generate_key()
self._is_resolved = True
# Also generate the cache key for the combined element sources
unique_key = self.get_unique_key()
self._cache_key = _cachekey.generate_key(unique_key)
# preflight():
#
# A internal wrapper for calling the abstract preflight() method on
# the element and its sources.
#
def preflight(self):
# Ensure that the first source does not need access to previous sources
if self._sources and self._sources[0]._requires_previous_sources():
from .element import ElementError # pylint: disable=cyclic-import
raise ElementError(
"{}: {} cannot be the first source of an element "
"as it requires access to previous sources".format(self, self._sources[0])
)
# Preflight the sources
for source in self.sources():
source._preflight()
# _fetch_source():
#
# Fetch a single source into the local CAS-based source cache
#
# Args:
# source (Source): The source to fetch
#
def _fetch_source(self, source):
# Cannot store a source in the CAS-based source cache on its own
# if the source depends on previous sources.
assert not source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH and not source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE
if self._sourcecache.contains(source):
# Already cached
return
cached_original = source._is_cached()
if not cached_original:
if self._sourcecache.has_fetch_remotes() and self._sourcecache.pull(source):
# Successfully fetched individual source from remote source cache
return
# Unable to fetch source from remote source cache, fall back to
# fetching the original source.
source._fetch()
# Stage original source into the local CAS-based source cache
self._sourcecache.commit(source)
# _fetch_source():
#
# Fetch a single original source
#
# Args:
# source (Source): The source to fetch
#
def _fetch_original_source(self, source):
if source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH:
with self._stage_previous_sources(source) as staging_directory:
source._fetch(previous_sources_dir=staging_directory)
else:
source._fetch()
# _stage():
#
# Stage the element sources
#
# Args:
# stop (Source): Only stage sources listed before this source
#
def _stage(self, *, stop=None):
cas = self._context.get_cascache()
vdir = CasBasedDirectory(cas)
for source in self._sources:
if source == stop:
break
if source._directory:
vsubdir = vdir.open_directory(source._directory.lstrip(os.path.sep), create=True)
else:
vsubdir = vdir
if source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH or source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
if source.BST_STAGE_VIRTUAL_DIRECTORY:
source._stage(vsubdir)
else:
# Stage previous sources
with cas.stage_directory(vsubdir._get_digest()) as tmpdir:
# Stage current source
source._stage(tmpdir)
# Capture modified tree
vsubdir._clear()
vsubdir.import_files(tmpdir, collect_result=False)
else:
source_dir = self._sourcecache.export(source)
vsubdir.import_files(source_dir, collect_result=False)
return vdir
# Context manager that stages sources in a cas based or temporary file
# based directory
@contextmanager
def _stage_previous_sources(self, source):
self.fetch_sources(stop=source)
vdir = self._stage(stop=source)
if source._directory:
vdir = vdir.open_directory(source._directory, create=True)
if source.BST_STAGE_VIRTUAL_DIRECTORY:
yield vdir
else:
cas = self._context.get_cascache()
with cas.stage_directory(vdir._get_digest()) as tempdir:
yield tempdir