Merge pull request #1450 from apache/juerg/cache-query
Make artifact and source cache query explicit
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index d4a716f..da1e03a 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -540,7 +540,7 @@
return dependency_refs
- # cached():
+ # query_cache():
#
# Check whether the artifact corresponding to the stored cache key is
# available. This also checks whether all required parts of the artifact
@@ -550,11 +550,7 @@
# Returns:
# (bool): Whether artifact is in local cache
#
- def cached(self):
-
- if self._cached is not None:
- return self._cached
-
+ def query_cache(self):
context = self._context
artifact = self._load_proto()
@@ -587,6 +583,21 @@
self._cached = True
return True
+ # cached()
+ #
+ # Return whether the artifact is available in the local cache. This must
+ # be called after `query_cache()` or `set_cached()`.
+ #
+ # Returns:
+ # (bool): Whether artifact is in local cache
+ #
+ def cached(self, *, buildtree=False):
+ assert self._cached is not None
+ ret = self._cached
+ if buildtree:
+ ret = ret and (self.cached_buildtree() or not self.buildtree_exists())
+ return ret
+
# cached_logs()
#
# Check if the artifact is cached with log files.
@@ -600,15 +611,6 @@
# If the artifact is cached, its log files are available as well.
return self._element._cached()
- # reset_cached()
- #
- # Allow the Artifact to query the filesystem to determine whether it
- # is cached or not.
- #
- def reset_cached(self):
- self._proto = None
- self._cached = None
-
# set_cached()
#
# Mark the artifact as cached without querying the filesystem.
diff --git a/src/buildstream/_artifactelement.py b/src/buildstream/_artifactelement.py
index 06e48c1..cf7f55d 100644
--- a/src/buildstream/_artifactelement.py
+++ b/src/buildstream/_artifactelement.py
@@ -51,17 +51,10 @@
def __init__(self, context, ref):
project_name, element_name, key = verify_artifact_ref(ref)
- # At this point we only know the key which was specified on the command line,
- # so we will pretend all keys are equal.
- #
- # If the artifact is cached, then the real keys will be loaded from the
- # artifact instead.
- #
- artifact = Artifact(self, context, strong_key=key, strict_key=key, weak_key=key)
project = ArtifactProject(project_name, context)
load_element = LoadElement(Node.from_dict({}), element_name, project.loader) # NOTE element has no .bst suffix
- super().__init__(context, project, load_element, None, artifact=artifact)
+ super().__init__(context, project, load_element, None, artifact_key=key)
########################################################
# Public API #
@@ -128,12 +121,17 @@
# Override internal Element methods #
########################################################
- # Once we've finished pulling an artifact, we assume the
- # state of the pulled artifact.
+ def _load_artifact(self, *, pull, strict=None): # pylint: disable=useless-super-delegation
+ # Always operate in strict mode as artifact key has been specified explicitly.
+ return super()._load_artifact(pull=pull, strict=True)
+
+ # Once we've finished loading an artifact, we assume the
+ # state of the loaded artifact. This is also used if the
+ # artifact is loaded after pulling.
#
- def _pull_done(self):
- super()._pull_done()
+ def _load_artifact_done(self):
self._mimic_artifact()
+ super()._load_artifact_done()
########################################################
# Implement Element abstract methods #
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index a15d20e..d426ee6 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -211,6 +211,9 @@
# SourceError: If one of the element sources has an error
#
def fetch(self):
+ if self._cached is None:
+ self.query_cache()
+
if self.cached():
return
@@ -293,7 +296,7 @@
length = min(len(key), context.log_key_length)
return key[:length]
- # cached():
+ # query_cache():
#
# Check if the element sources are cached in CAS, generating the source
# cache keys if needed.
@@ -301,10 +304,7 @@
# Returns:
# (bool): True if the element sources are cached
#
- def cached(self):
- if self._cached is not None:
- return self._cached
-
+ def query_cache(self):
cas = self._context.get_cascache()
elementsourcescache = self._elementsourcescache
@@ -321,6 +321,28 @@
self._cached = True
return True
+ # can_query_cache():
+ #
+ # Returns whether the cache status is available.
+ #
+ # Returns:
+ # (bool): True if cache status is available
+ #
+ def can_query_cache(self):
+ return self._cached is not None
+
+ # cached()
+ #
+ # Return whether the element sources are cached in CAS. This must be
+ # called only when all sources are resolved.
+ #
+ # Returns:
+ # (bool): True if the element sources are cached
+ #
+ def cached(self):
+ assert self._cached is not None
+ return self._cached
+
# is_resolved():
#
# Get whether all sources of the element are resolved
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index 6a9a18b..f20d98f 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -606,6 +606,8 @@
dependencies = app.stream.load_selection(elements, selection=deps, except_targets=except_)
+ app.stream.query_cache(dependencies)
+
if order == "alpha":
dependencies = sorted(dependencies)
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 99c9f42..589e548 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -357,10 +357,14 @@
else:
if element.get_kind() == "junction":
line = p.fmt_subst(line, "state", "junction", fg="magenta")
+ elif not element._can_query_cache():
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
elif element._cached_failure():
line = p.fmt_subst(line, "state", "failed", fg="red")
elif element._cached_success():
line = p.fmt_subst(line, "state", "cached", fg="magenta")
+ elif not element._can_query_source_cache():
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
elif element._fetch_needed():
line = p.fmt_subst(line, "state", "fetch needed", fg="red")
elif element._buildable():
diff --git a/src/buildstream/_loader/loadelement.pyx b/src/buildstream/_loader/loadelement.pyx
index 210869e..f69e138 100644
--- a/src/buildstream/_loader/loadelement.pyx
+++ b/src/buildstream/_loader/loadelement.pyx
@@ -286,7 +286,6 @@
from ..element import Element
element = Element._new_from_load_element(self)
- element._initialize_state()
# Custom error for link dependencies, since we don't completely
# parse their dependencies we cannot rely on the built-in ElementError.
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 48fb1c4..df4df84 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -816,7 +816,6 @@
)
element = Element._new_from_load_element(load_element)
- element._initialize_state()
# Handle the case where a subproject has no ref
#
@@ -830,6 +829,7 @@
# Handle the case where a subproject needs to be fetched
#
+ element._query_source_cache()
if element._should_fetch():
self.load_context.fetch_subprojects([element])
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index e1e6dcf..802e52b 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -88,10 +88,7 @@
return elements
def plan() -> List[Element]:
- # Keep locally cached elements in the plan if remote artifact cache is used
- # to allow pulling artifact with strict cache key, if available.
- plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes()
- return _Planner().plan(targets, plan_cached)
+ return _Planner().plan(targets)
# Work around python not having a switch statement; this is
# much clearer than the if/elif/else block we used to have.
@@ -293,9 +290,8 @@
# _Planner()
#
# An internal object used for constructing build plan
-# from a given resolved toplevel element, while considering what
-# parts need to be built depending on build only dependencies
-# being cached, and depth sorting for more efficient processing.
+# from a given resolved toplevel element, using depth
+# sorting for more efficient processing.
#
class _Planner:
def __init__(self):
@@ -319,15 +315,13 @@
for dep in element._dependencies(_Scope.RUN, recurse=False):
self.plan_element(dep, depth)
- # Dont try to plan builds of elements that are cached already
- if not element._cached_success():
- for dep in element._dependencies(_Scope.BUILD, recurse=False):
- self.plan_element(dep, depth + 1)
+ for dep in element._dependencies(_Scope.BUILD, recurse=False):
+ self.plan_element(dep, depth + 1)
self.depth_map[element] = depth
self.visiting_elements.remove(element)
- def plan(self, roots, plan_cached):
+ def plan(self, roots):
for root in roots:
self.plan_element(root, 0)
@@ -337,4 +331,4 @@
for index, item in enumerate(depth_sorted):
item[0]._set_depth(index)
- return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
+ return [item[0] for item in depth_sorted]
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 18bf392..ee84982 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -50,13 +50,13 @@
if not element._can_query_cache():
return QueueStatus.PENDING
- if element._cached():
+ if element._cached_success():
return QueueStatus.SKIP
# This will automatically skip elements which
# have no sources.
- if not element._should_fetch(self._should_fetch_original):
+ if element._can_query_source_cache() and not element._should_fetch(self._should_fetch_original):
return QueueStatus.SKIP
return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d6959..925ded0 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -21,7 +21,6 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
-from ..jobs import JobStatus
from ..._exceptions import SkipJob
@@ -37,28 +36,15 @@
return PullQueue._pull_or_skip
def status(self, element):
- if not element._can_query_cache():
- return QueueStatus.PENDING
-
if element._pull_pending():
return QueueStatus.READY
else:
return QueueStatus.SKIP
def done(self, _, element, result, status):
-
- if status is JobStatus.FAIL:
- return
-
- element._pull_done()
-
- def register_pending_element(self, element):
- # Set a "can_query_cache"_callback for an element which is not
- # immediately ready to query the artifact cache so that it
- # may be pulled.
- element._set_can_query_cache_callback(self._enqueue_element)
+ element._load_artifact_done()
@staticmethod
def _pull_or_skip(element):
- if not element._pull():
+ if not element._load_artifact(pull=True):
raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a44d613..004a35d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -177,6 +177,35 @@
)
return target_objects
+ # query_cache()
+ #
+ # Query the artifact and source caches to determine the cache status
+ # of the specified elements.
+ #
+ # Args:
+ # elements (list of Element): The elements to check
+ # sources (bool): True to only query the source cache
+ #
+ def query_cache(self, elements, *, sources=False):
+ with self._context.messenger.timed_activity("Query cache", silent_nested=True):
+ # Enqueue complete build plan as this is required to determine `buildable` status.
+ plan = list(_pipeline.dependencies(elements, _Scope.ALL))
+
+ for element in plan:
+ if element._can_query_cache():
+ # Cache status already available.
+ # This is the case for artifact elements, which load the
+ # artifact early on.
+ pass
+ elif not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+ element._load_artifact(pull=False)
+ if not element._can_query_cache() or not element._cached_success():
+ element._query_source_cache()
+ if not element._pull_pending():
+ element._load_artifact_done()
+ elif element._has_all_sources_resolved():
+ element._query_source_cache()
+
# shell()
#
# Run a shell
@@ -241,6 +270,8 @@
element = self.targets[0]
element._set_required(scope)
+ self.query_cache([element] + elements)
+
if pull_:
self._reset()
self._add_queue(PullQueue(self._scheduler))
@@ -280,6 +311,7 @@
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
+ self.query_cache([element], sources=True)
self._fetch([element])
_pipeline.assert_sources_cached(self._context, [element])
@@ -342,6 +374,8 @@
for element in self.targets:
element._set_artifact_files_required(scope=scope)
+ self.query_cache(elements)
+
# Now construct the queues
#
self._reset()
@@ -393,6 +427,8 @@
ignore_project_source_remotes=ignore_project_source_remotes,
)
+ self.query_cache(elements, sources=True)
+
# Delegated to a shared fetch method
self._fetch(elements, announce_session=True)
@@ -465,6 +501,8 @@
ignore_project_source_remotes=ignore_project_source_remotes,
)
+ self.query_cache(elements, sources=True)
+
if not self._sourcecache.has_push_remotes():
raise StreamError("No source caches available for pushing sources")
@@ -513,6 +551,9 @@
raise StreamError("No artifact caches available for pulling artifacts")
_pipeline.assert_consistent(self._context, elements)
+
+ self.query_cache(elements)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
@@ -559,6 +600,8 @@
_pipeline.assert_consistent(self._context, elements)
+ self.query_cache(elements)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._add_queue(ArtifactPushQueue(self._scheduler))
@@ -621,6 +664,8 @@
self._check_location_writable(location, force=force, tar=tar)
+ self.query_cache(elements)
+
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
self._context.messenger.info("Attempting to fetch missing or incomplete artifact")
@@ -699,6 +744,8 @@
targets, selection=selection, connect_artifact_cache=True, load_artifacts=True
)
+ self.query_cache(target_objects)
+
if self._artifacts.has_fetch_remotes():
self._resolve_cached_remotely(target_objects)
@@ -718,6 +765,8 @@
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
+ self.query_cache(target_objects)
+
artifact_logs = {}
for obj in target_objects:
ref = obj.get_artifact_name()
@@ -746,6 +795,8 @@
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
+ self.query_cache(target_objects)
+
elements_to_files = {}
for obj in target_objects:
ref = obj.get_artifact_name()
@@ -835,6 +886,7 @@
)
# Assert all sources are cached in the source dir
+ self.query_cache(elements, sources=True)
self._fetch(elements)
_pipeline.assert_sources_cached(self._context, elements)
@@ -885,6 +937,7 @@
# If we're going to checkout, we need at least a fetch,
#
if not no_checkout:
+ self.query_cache(elements, sources=True)
self._fetch(elements, fetch_original=True)
expanded_directories = []
@@ -1547,6 +1600,8 @@
for element in artifacts:
element._set_required(_Scope.NONE)
+ self.query_cache(artifacts)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(artifacts)
@@ -1578,7 +1633,6 @@
# Now move on to loading primary selection.
#
- self._resolve_elements(self.targets)
selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False)
selected = _pipeline.except_elements(self.targets, selected, except_elements)
@@ -1594,40 +1648,6 @@
return selected
- # _resolve_elements()
- #
- # Resolve element state and cache keys.
- #
- # Args:
- # targets (list of Element): The list of toplevel element targets
- #
- def _resolve_elements(self, targets):
- with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task:
- # We need to go through the project to access the loader
- #
- # FIXME: We need to calculate the total elements to resolve differently so that
- # it can include artifact elements
- #
- if task and self._project:
- task.set_maximum_progress(self._project.loader.loaded)
-
- # XXX: Now that Element._update_state() can trigger recursive update_state calls
- # it is possible that we could get a RecursionError. However, this is unlikely
- # to happen, even for large projects (tested with the Debian stack). Although,
- # if it does become a problem we may have to set the recursion limit to a
- # greater value.
- for element in _pipeline.dependencies(targets, _Scope.ALL):
- # Determine initial element state.
- element._initialize_state()
-
- # We may already have Elements which are cached and have their runtimes
- # cached, if this is the case, we should immediately notify their reverse
- # dependencies.
- element._update_ready_for_runtime_and_cached()
-
- if task:
- task.add_current_progress()
-
# _reset()
#
# Resets the internal state related to a given scheduler run.
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 503361e..6a296bf 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -226,7 +226,7 @@
load_element: "LoadElement",
plugin_conf: Dict[str, Any],
*,
- artifact: Artifact = None,
+ artifact_key: str = None,
):
self.__cache_key_dict = None # Dict for cache key calculation
@@ -290,7 +290,7 @@
self.__sourcecache = context.sourcecache # Source cache
self.__assemble_scheduled = False # Element is scheduled to be assembled
self.__assemble_done = False # Element is assembled
- self.__pull_done = False # Whether pull was attempted
+ self.__pull_pending = False # Whether pull is pending
self.__cached_successfully = None # If the Element is known to be successfully cached
self.__splits = None # Resolved regex objects for computing split domains
self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap
@@ -318,8 +318,8 @@
self.__environment: Dict[str, str] = {}
self.__variables: Optional[Variables] = None
- if artifact:
- self.__initialize_from_artifact(artifact)
+ if artifact_key:
+ self.__initialize_from_artifact_key(artifact_key)
else:
self.__initialize_from_yaml(load_element, plugin_conf)
@@ -1135,6 +1135,8 @@
element.__preflight()
+ element._initialize_state()
+
if task:
task.add_current_progress()
@@ -1182,9 +1184,6 @@
# the artifact cache
#
def _cached(self):
- if not self.__artifact:
- return False
-
return self.__artifact.cached()
# _cached_remotely():
@@ -1300,7 +1299,17 @@
#
def _can_query_cache(self):
# cache cannot be queried until strict cache key is available
- return self.__strict_cache_key is not None
+ return self.__artifact is not None
+
+ # _can_query_source_cache():
+ #
+ # Returns whether the source cache status is available.
+ #
+ # Returns:
+ # (bool): True if source cache can be queried
+ #
+ def _can_query_source_cache(self):
+ return self.__sources.can_query_cache()
# _initialize_state()
#
@@ -1328,9 +1337,6 @@
#
# - __update_cache_keys()
# - Computes the strong and weak cache keys.
- # - _update_artifact_state()
- # - Computes the state of the element's artifact using the
- # cache key.
# - __schedule_assembly_when_necessary()
# - Schedules assembly of an element, iff its current state
# allows/necessitates it
@@ -1588,7 +1594,9 @@
def __should_schedule(self):
# We're processing if we're already scheduled, we've
# finished assembling or if we're waiting to pull.
- processing = self.__assemble_scheduled or self.__assemble_done or self._pull_pending()
+ processing = (
+ self.__assemble_scheduled or self.__assemble_done or (self._can_query_cache() and self._pull_pending())
+ )
# We should schedule a build when
return (
@@ -1654,7 +1662,7 @@
self.__artifact.set_cached()
self.__cached_successfully = True
else:
- self.__artifact.reset_cached()
+ self.__artifact.query_cache()
# When we're building in non-strict mode, we may have
# assembled everything to this point without a strong cache
@@ -1863,83 +1871,91 @@
# (bool): Whether a pull operation is pending
#
def _pull_pending(self):
- if self._get_workspace():
- # Workspace builds are never pushed to artifact servers
- return False
+ return self.__pull_pending
- # Check whether the pull has been invoked with a specific subdir requested
- # in user context, as to complete a partial artifact
- pull_buildtrees = self._get_context().pull_buildtrees
-
- if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
- if pull_buildtrees:
- # If we've specified a subdir, check if the subdir is cached locally
- # or if it's possible to get
- if self._cached_buildtree() or not self._buildtree_exists():
- return False
- else:
- return False
-
- # Pull is pending if artifact remote server available
- # and pull has not been attempted yet
- return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
-
- # _pull_done()
+ # _load_artifact_done()
#
- # Indicate that pull was attempted.
+ # Indicate that `_load_artifact()` has completed.
#
- # This needs to be called in the main process after a pull
+ # This needs to be called in the main process after `_load_artifact()`
# succeeds or fails so that we properly update the main
# process data model
#
# This will result in updating the element state.
#
- def _pull_done(self):
+ def _load_artifact_done(self):
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
- self.__pull_done = True
+ assert self.__artifact
- # Artifact may become cached after pulling, so let it query the
- # filesystem again to check
- self.__artifact.reset_cached()
+ context = self._get_context()
- # We may not have actually pulled an artifact - the pull may
- # have failed. We might therefore need to schedule assembly.
- self.__schedule_assembly_when_necessary()
- # If we've finished pulling, an artifact might now exist
- # locally, so we might need to update a non-strict strong
- # cache key.
- self.__update_cache_key_non_strict()
+ if not context.get_strict() and self.__artifact.cached():
+ # In non-strict mode, strong cache key becomes available when
+ # the artifact is cached
+ self.__update_cache_key_non_strict()
+
self._update_ready_for_runtime_and_cached()
- # _pull():
+ self.__schedule_assembly_when_necessary()
+
+ if self.__can_query_cache_callback is not None:
+ self.__can_query_cache_callback(self)
+ self.__can_query_cache_callback = None
+
+ # _load_artifact():
#
- # Pull artifact from remote artifact repository into local artifact cache.
+ # Load artifact from cache or pull it from remote artifact repository.
#
# Returns: True if the artifact has been downloaded, False otherwise
#
- def _pull(self):
+ def _load_artifact(self, *, pull, strict=None):
context = self._get_context()
- # Get optional specific subdir to pull and optional list to not pull
- # based off of user context
- pull_buildtrees = context.pull_buildtrees
+ if strict is None:
+ strict = context.get_strict()
- # Attempt to pull artifact without knowing whether it's available
- strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
- if strict_artifact.pull(pull_buildtrees=pull_buildtrees):
- # Notify successful download
- return True
+ pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
- if not context.get_strict() and not self._cached():
- # In non-strict mode also try pulling weak artifact
- # if no weak artifact is cached yet.
- artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
- return artifact.pull(pull_buildtrees=pull_buildtrees)
- else:
- # No artifact has been downloaded
+ # First check whether we already have the strict artifact in the local cache
+ artifact = Artifact(
+ self,
+ context,
+ strict_key=self.__strict_cache_key,
+ strong_key=self.__strict_cache_key,
+ weak_key=self.__weak_cache_key,
+ )
+ artifact.query_cache()
+
+ self.__pull_pending = False
+ if not pull and not artifact.cached(buildtree=pull_buildtrees):
+ if self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace():
+ # Artifact is not completely available in cache and artifact remote server is available.
+ # Stop artifact loading here as pull is required to proceed.
+ self.__pull_pending = True
+
+ # Attempt to pull artifact with the strict cache key
+ pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+ if artifact.cached() or strict:
+ self.__artifact = artifact
+ return pulled
+ elif self.__pull_pending:
return False
+ # In non-strict mode retry with weak cache key
+ artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
+ artifact.query_cache()
+
+ # Attempt to pull artifact with the weak cache key
+ pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+ self.__artifact = artifact
+ return pulled
+
+ def _query_source_cache(self):
+ self.__sources.query_cache()
+
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
return True
@@ -2378,7 +2394,7 @@
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
if not self.__ready_for_runtime_and_cached:
- if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
+ if self.__runtime_deps_uncached == 0 and self.__artifact and self.__cache_key and self._cached_success():
self.__ready_for_runtime_and_cached = True
# Notify reverse dependencies
@@ -2595,6 +2611,7 @@
return None
artifact = Artifact(self, self._get_context(), strong_key=workspace.last_build)
+ artifact.query_cache()
if not artifact.cached():
return None
@@ -2862,13 +2879,33 @@
self.__variables.expand(sandbox_config)
self.__sandbox_config = SandboxConfig.new_from_node(sandbox_config, platform=context.platform)
- # __initialize_from_artifact()
+ # __initialize_from_artifact_key()
#
- # Initialize the element state from an Artifact object
+ # Initialize the element state from an artifact key
#
- def __initialize_from_artifact(self, artifact: Artifact):
- self.__artifact = artifact
- self._mimic_artifact()
+ def __initialize_from_artifact_key(self, key: str):
+ # At this point we only know the key which was specified on the command line,
+ # so we will pretend all keys are equal.
+ #
+ # If the artifact is cached, then the real keys will be loaded from the
+ # artifact in `_load_artifact()` and `_load_artifact_done()`.
+ #
+ self.__cache_key = key
+ self.__strict_cache_key = key
+ self.__weak_cache_key = key
+
+ self._initialize_state()
+
+ # ArtifactElement requires access to the artifact early on to walk
+ # dependencies.
+ self._load_artifact(pull=False)
+
+ if not self._cached():
+ # Remotes are not initialized when artifact elements are loaded.
+ # Always consider pull pending if the artifact is not cached.
+ self.__pull_pending = True
+ else:
+ self._load_artifact_done()
@classmethod
def __compose_default_splits(cls, project, defaults, first_pass):
@@ -3244,59 +3281,10 @@
# In strict mode, the strong cache key always matches the strict cache key
self.__cache_key = self.__strict_cache_key
- # If we've newly calculated a cache key, our artifact's
- # current state will also change - after all, we can now find
- # a potential existing artifact.
- self.__update_artifact_state()
-
# Update the message kwargs in use for this plugin to dispatch messages with
#
self._message_kwargs["element_key"] = self._get_display_key()
- # __update_artifact_state()
- #
- # Updates the data involved in knowing about the artifact corresponding
- # to this element.
- #
- # If the state changes, this will subsequently call
- # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
- # possible.
- #
- # Element.__update_cache_keys() must be called before this to have
- # meaningful results, because the element must know its cache key before
- # it can check whether an artifact exists for that cache key.
- #
- def __update_artifact_state(self):
- assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
- assert self.__artifact is None
-
- context = self._get_context()
-
- strict_artifact = Artifact(
- self,
- context,
- strong_key=self.__strict_cache_key,
- strict_key=self.__strict_cache_key,
- weak_key=self.__weak_cache_key,
- )
- if context.get_strict() or strict_artifact.cached():
- self.__artifact = strict_artifact
- else:
- self.__artifact = Artifact(
- self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
- )
-
- if not context.get_strict() and self.__artifact.cached():
- # In non-strict mode, strong cache key becomes available when
- # the artifact is cached
- self.__update_cache_key_non_strict()
-
- self.__schedule_assembly_when_necessary()
-
- if self.__can_query_cache_callback is not None:
- self.__can_query_cache_callback(self)
- self.__can_query_cache_callback = None
-
# __update_cache_key_non_strict()
#
# Calculates the strong cache key if it hasn't already been set.
@@ -3305,7 +3293,7 @@
# strict cache key, so no work needs to be done.
#
# When buildstream is not run in strict mode, this requires the artifact
- # state (as set in Element.__update_artifact_state()) to be set accordingly,
+ # state (as set in Element._load_artifact()) to be set accordingly,
# as the cache key can be loaded from the cache (possibly pulling from
# a remote cache).
#
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 858065d..24a1ee1 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -6,7 +6,6 @@
import pytest
from buildstream import _yaml
-from buildstream.types import _Scope
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
@@ -33,15 +32,12 @@
# Create a local artifact cache handle
artifactcache = context.artifactcache
- # Ensure the element's artifact memeber is initialised
- # This is duplicated from Pipeline.resolve_elements()
- # as this test does not use the cli frontend.
- for e in element._dependencies(_Scope.ALL):
- e._initialize_state()
-
# Initialize remotes
context.initialize_remotes(True, True, None, None)
+ # Query local cache
+ element._load_artifact(pull=False)
+
assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst"
assert element._push(), "Push operation failed"
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64..6c8a4b7 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@
def test_fetch_consistency_error(cli, datafiles):
project = str(datafiles)
- # When the error occurs outside of the scheduler at load time,
- # then the SourceError is reported directly as the main error.
result = cli.run(project=project, args=["source", "fetch", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@
project = str(datafiles)
result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index a32c747..8919330 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -372,8 +372,8 @@
#
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- assert result.get_pulled_elements() == ["target.bst"]
- assert result.get_pushed_elements() == []
+ assert "target.bst" in result.get_pulled_elements()
+ assert "target.bst" not in result.get_pushed_elements()
# Delete the artifact locally again.
cli.remove_artifact_from_cache(project, "target.bst")
@@ -385,8 +385,8 @@
)
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- assert result.get_pulled_elements() == ["target.bst"]
- assert result.get_pushed_elements() == ["target.bst"]
+ assert "target.bst" in result.get_pulled_elements()
+ assert "target.bst" in result.get_pushed_elements()
# Ensure that when an artifact's size exceeds available disk space
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index 3dd686d..950cd83 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@
def test_track_consistency_error(cli, datafiles):
project = str(datafiles)
- # Track the element causing a consistency error
+ # Track the element causing a consistency error in `is_cached()`
result = cli.run(project=project, args=["source", "track", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
def test_track_consistency_bug(cli, datafiles):
project = str(datafiles)
- # Track the element causing an unhandled exception
+ # Track the element causing an unhandled exception in `is_cached()`
result = cli.run(project=project, args=["source", "track", "bug.bst"])
- # We expect BuildStream to fail gracefully, with no recorded exception.
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 86dac0b..f37242f 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -74,7 +74,7 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -114,9 +114,9 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
# check that we have the source in the cas now and it's not fetched
+ element._query_source_cache()
assert element._cached_sources()
assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
@@ -134,7 +134,7 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -152,7 +152,7 @@
# Check that the source in both in the source dir and the local CAS
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert element._cached_sources()
@@ -168,7 +168,7 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -200,7 +200,7 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index edbcfdf..47e845b 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -86,7 +86,7 @@
project.ensure_fully_loaded()
element = project.load_elements(["push.bst"])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -136,7 +136,7 @@
project.ensure_fully_loaded()
element = project.load_elements(["push.bst"])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f058..6671c79 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -64,7 +64,7 @@
# now check that the source is in the refs file, this is pretty messy but
# seems to be the only way to get the sources?
element = project.load_elements(["import-bin.bst"])[0]
- element._initialize_state()
+ element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
assert sourcecache.contains(source)
@@ -99,7 +99,7 @@
sourcecache = context.sourcecache
element = project.load_elements(["import-dev.bst"])[0]
- element._initialize_state()
+ element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
@@ -133,9 +133,9 @@
project.ensure_fully_loaded()
element = project.load_elements(["import-dev.bst"])[0]
- element._initialize_state()
# check consistency of the source
+ element._query_source_cache()
assert not element._cached_sources()
res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d8..861e70c 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- # Track will encounter an inconsistent submodule without any ref
+ # Track to update to the offending commit
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
+ # Fetch after track will encounter an inconsistent submodule without any ref
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+ result.assert_success()
+
# Assert that we are just fine without it, and emit a warning to the user.
assert "Ignoring inconsistent submodule" in result.stderr