Merge pull request #1450 from apache/juerg/cache-query

Make artifact and source cache query explicit
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index d4a716f..da1e03a 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -540,7 +540,7 @@
 
         return dependency_refs
 
-    # cached():
+    # query_cache():
     #
     # Check whether the artifact corresponding to the stored cache key is
     # available. This also checks whether all required parts of the artifact
@@ -550,11 +550,7 @@
     # Returns:
     #     (bool): Whether artifact is in local cache
     #
-    def cached(self):
-
-        if self._cached is not None:
-            return self._cached
-
+    def query_cache(self):
         context = self._context
 
         artifact = self._load_proto()
@@ -587,6 +583,21 @@
         self._cached = True
         return True
 
+    # cached()
+    #
+    # Return whether the artifact is available in the local cache. This must
+    # be called after `query_cache()` or `set_cached()`.
+    #
+    # Returns:
+    #     (bool): Whether artifact is in local cache
+    #
+    def cached(self, *, buildtree=False):
+        assert self._cached is not None
+        ret = self._cached
+        if buildtree:
+            ret = ret and (self.cached_buildtree() or not self.buildtree_exists())
+        return ret
+
     # cached_logs()
     #
     # Check if the artifact is cached with log files.
@@ -600,15 +611,6 @@
         # If the artifact is cached, its log files are available as well.
         return self._element._cached()
 
-    # reset_cached()
-    #
-    # Allow the Artifact to query the filesystem to determine whether it
-    # is cached or not.
-    #
-    def reset_cached(self):
-        self._proto = None
-        self._cached = None
-
     # set_cached()
     #
     # Mark the artifact as cached without querying the filesystem.
diff --git a/src/buildstream/_artifactelement.py b/src/buildstream/_artifactelement.py
index 06e48c1..cf7f55d 100644
--- a/src/buildstream/_artifactelement.py
+++ b/src/buildstream/_artifactelement.py
@@ -51,17 +51,10 @@
     def __init__(self, context, ref):
         project_name, element_name, key = verify_artifact_ref(ref)
 
-        # At this point we only know the key which was specified on the command line,
-        # so we will pretend all keys are equal.
-        #
-        # If the artifact is cached, then the real keys will be loaded from the
-        # artifact instead.
-        #
-        artifact = Artifact(self, context, strong_key=key, strict_key=key, weak_key=key)
         project = ArtifactProject(project_name, context)
         load_element = LoadElement(Node.from_dict({}), element_name, project.loader)  # NOTE element has no .bst suffix
 
-        super().__init__(context, project, load_element, None, artifact=artifact)
+        super().__init__(context, project, load_element, None, artifact_key=key)
 
     ########################################################
     #                      Public API                      #
@@ -128,12 +121,17 @@
     #         Override internal Element methods            #
     ########################################################
 
-    # Once we've finished pulling an artifact, we assume the
-    # state of the pulled artifact.
+    def _load_artifact(self, *, pull, strict=None):  # pylint: disable=useless-super-delegation
+        # Always operate in strict mode as artifact key has been specified explicitly.
+        return super()._load_artifact(pull=pull, strict=True)
+
+    # Once we've finished loading an artifact, we assume the
+    # state of the loaded artifact. This is also used if the
+    # artifact is loaded after pulling.
     #
-    def _pull_done(self):
-        super()._pull_done()
+    def _load_artifact_done(self):
         self._mimic_artifact()
+        super()._load_artifact_done()
 
     ########################################################
     #         Implement Element abstract methods           #
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index a15d20e..d426ee6 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -211,6 +211,9 @@
     #    SourceError: If one of the element sources has an error
     #
     def fetch(self):
+        if self._cached is None:
+            self.query_cache()
+
         if self.cached():
             return
 
@@ -293,7 +296,7 @@
         length = min(len(key), context.log_key_length)
         return key[:length]
 
-    # cached():
+    # query_cache():
     #
     # Check if the element sources are cached in CAS, generating the source
     # cache keys if needed.
@@ -301,10 +304,7 @@
     # Returns:
     #    (bool): True if the element sources are cached
     #
-    def cached(self):
-        if self._cached is not None:
-            return self._cached
-
+    def query_cache(self):
         cas = self._context.get_cascache()
         elementsourcescache = self._elementsourcescache
 
@@ -321,6 +321,28 @@
         self._cached = True
         return True
 
+    # can_query_cache():
+    #
+    # Returns whether the cache status is available.
+    #
+    # Returns:
+    #    (bool): True if cache status is available
+    #
+    def can_query_cache(self):
+        return self._cached is not None
+
+    # cached()
+    #
+    # Return whether the element sources are cached in CAS. This must be
+    # called only when all sources are resolved.
+    #
+    # Returns:
+    #    (bool): True if the element sources are cached
+    #
+    def cached(self):
+        assert self._cached is not None
+        return self._cached
+
     # is_resolved():
     #
     # Get whether all sources of the element are resolved
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index 6a9a18b..f20d98f 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -606,6 +606,8 @@
 
         dependencies = app.stream.load_selection(elements, selection=deps, except_targets=except_)
 
+        app.stream.query_cache(dependencies)
+
         if order == "alpha":
             dependencies = sorted(dependencies)
 
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 99c9f42..589e548 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -357,10 +357,14 @@
                 else:
                     if element.get_kind() == "junction":
                         line = p.fmt_subst(line, "state", "junction", fg="magenta")
+                    elif not element._can_query_cache():
+                        line = p.fmt_subst(line, "state", "waiting", fg="blue")
                     elif element._cached_failure():
                         line = p.fmt_subst(line, "state", "failed", fg="red")
                     elif element._cached_success():
                         line = p.fmt_subst(line, "state", "cached", fg="magenta")
+                    elif not element._can_query_source_cache():
+                        line = p.fmt_subst(line, "state", "waiting", fg="blue")
                     elif element._fetch_needed():
                         line = p.fmt_subst(line, "state", "fetch needed", fg="red")
                     elif element._buildable():
diff --git a/src/buildstream/_loader/loadelement.pyx b/src/buildstream/_loader/loadelement.pyx
index 210869e..f69e138 100644
--- a/src/buildstream/_loader/loadelement.pyx
+++ b/src/buildstream/_loader/loadelement.pyx
@@ -286,7 +286,6 @@
             from ..element import Element
 
             element = Element._new_from_load_element(self)
-            element._initialize_state()
 
             # Custom error for link dependencies, since we don't completely
             # parse their dependencies we cannot rely on the built-in ElementError.
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 48fb1c4..df4df84 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -816,7 +816,6 @@
             )
 
         element = Element._new_from_load_element(load_element)
-        element._initialize_state()
 
         # Handle the case where a subproject has no ref
         #
@@ -830,6 +829,7 @@
 
         # Handle the case where a subproject needs to be fetched
         #
+        element._query_source_cache()
         if element._should_fetch():
             self.load_context.fetch_subprojects([element])
 
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index e1e6dcf..802e52b 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -88,10 +88,7 @@
         return elements
 
     def plan() -> List[Element]:
-        # Keep locally cached elements in the plan if remote artifact cache is used
-        # to allow pulling artifact with strict cache key, if available.
-        plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes()
-        return _Planner().plan(targets, plan_cached)
+        return _Planner().plan(targets)
 
     # Work around python not having a switch statement; this is
     # much clearer than the if/elif/else block we used to have.
@@ -293,9 +290,8 @@
 # _Planner()
 #
 # An internal object used for constructing build plan
-# from a given resolved toplevel element, while considering what
-# parts need to be built depending on build only dependencies
-# being cached, and depth sorting for more efficient processing.
+# from a given resolved toplevel element, using depth
+# sorting for more efficient processing.
 #
 class _Planner:
     def __init__(self):
@@ -319,15 +315,13 @@
         for dep in element._dependencies(_Scope.RUN, recurse=False):
             self.plan_element(dep, depth)
 
-        # Dont try to plan builds of elements that are cached already
-        if not element._cached_success():
-            for dep in element._dependencies(_Scope.BUILD, recurse=False):
-                self.plan_element(dep, depth + 1)
+        for dep in element._dependencies(_Scope.BUILD, recurse=False):
+            self.plan_element(dep, depth + 1)
 
         self.depth_map[element] = depth
         self.visiting_elements.remove(element)
 
-    def plan(self, roots, plan_cached):
+    def plan(self, roots):
         for root in roots:
             self.plan_element(root, 0)
 
@@ -337,4 +331,4 @@
         for index, item in enumerate(depth_sorted):
             item[0]._set_depth(index)
 
-        return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
+        return [item[0] for item in depth_sorted]
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 18bf392..ee84982 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -50,13 +50,13 @@
             if not element._can_query_cache():
                 return QueueStatus.PENDING
 
-            if element._cached():
+            if element._cached_success():
                 return QueueStatus.SKIP
 
         # This will automatically skip elements which
         # have no sources.
 
-        if not element._should_fetch(self._should_fetch_original):
+        if element._can_query_source_cache() and not element._should_fetch(self._should_fetch_original):
             return QueueStatus.SKIP
 
         return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d6959..925ded0 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -21,7 +21,6 @@
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
-from ..jobs import JobStatus
 from ..._exceptions import SkipJob
 
 
@@ -37,28 +36,15 @@
         return PullQueue._pull_or_skip
 
     def status(self, element):
-        if not element._can_query_cache():
-            return QueueStatus.PENDING
-
         if element._pull_pending():
             return QueueStatus.READY
         else:
             return QueueStatus.SKIP
 
     def done(self, _, element, result, status):
-
-        if status is JobStatus.FAIL:
-            return
-
-        element._pull_done()
-
-    def register_pending_element(self, element):
-        # Set a "can_query_cache"_callback for an element which is not
-        # immediately ready to query the artifact cache so that it
-        # may be pulled.
-        element._set_can_query_cache_callback(self._enqueue_element)
+        element._load_artifact_done()
 
     @staticmethod
     def _pull_or_skip(element):
-        if not element._pull():
+        if not element._load_artifact(pull=True):
             raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a44d613..004a35d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -177,6 +177,35 @@
             )
             return target_objects
 
+    # query_cache()
+    #
+    # Query the artifact and source caches to determine the cache status
+    # of the specified elements.
+    #
+    # Args:
+    #    elements (list of Element): The elements to check
+    #    sources (bool): True to only query the source cache
+    #
+    def query_cache(self, elements, *, sources=False):
+        with self._context.messenger.timed_activity("Query cache", silent_nested=True):
+            # Enqueue complete build plan as this is required to determine `buildable` status.
+            plan = list(_pipeline.dependencies(elements, _Scope.ALL))
+
+            for element in plan:
+                if element._can_query_cache():
+                    # Cache status already available.
+                    # This is the case for artifact elements, which load the
+                    # artifact early on.
+                    pass
+                elif not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+                    element._load_artifact(pull=False)
+                    if not element._can_query_cache() or not element._cached_success():
+                        element._query_source_cache()
+                    if not element._pull_pending():
+                        element._load_artifact_done()
+                elif element._has_all_sources_resolved():
+                    element._query_source_cache()
+
     # shell()
     #
     # Run a shell
@@ -241,6 +270,8 @@
             element = self.targets[0]
             element._set_required(scope)
 
+            self.query_cache([element] + elements)
+
             if pull_:
                 self._reset()
                 self._add_queue(PullQueue(self._scheduler))
@@ -280,6 +311,7 @@
 
         # Ensure we have our sources if we are launching a build shell
         if scope == _Scope.BUILD and not usebuildtree:
+            self.query_cache([element], sources=True)
             self._fetch([element])
             _pipeline.assert_sources_cached(self._context, [element])
 
@@ -342,6 +374,8 @@
                 for element in self.targets:
                     element._set_artifact_files_required(scope=scope)
 
+        self.query_cache(elements)
+
         # Now construct the queues
         #
         self._reset()
@@ -393,6 +427,8 @@
             ignore_project_source_remotes=ignore_project_source_remotes,
         )
 
+        self.query_cache(elements, sources=True)
+
         # Delegated to a shared fetch method
         self._fetch(elements, announce_session=True)
 
@@ -465,6 +501,8 @@
             ignore_project_source_remotes=ignore_project_source_remotes,
         )
 
+        self.query_cache(elements, sources=True)
+
         if not self._sourcecache.has_push_remotes():
             raise StreamError("No source caches available for pushing sources")
 
@@ -513,6 +551,9 @@
             raise StreamError("No artifact caches available for pulling artifacts")
 
         _pipeline.assert_consistent(self._context, elements)
+
+        self.query_cache(elements)
+
         self._reset()
         self._add_queue(PullQueue(self._scheduler))
         self._enqueue_plan(elements)
@@ -559,6 +600,8 @@
 
         _pipeline.assert_consistent(self._context, elements)
 
+        self.query_cache(elements)
+
         self._reset()
         self._add_queue(PullQueue(self._scheduler))
         self._add_queue(ArtifactPushQueue(self._scheduler))
@@ -621,6 +664,8 @@
 
         self._check_location_writable(location, force=force, tar=tar)
 
+        self.query_cache(elements)
+
         uncached_elts = [elt for elt in elements if not elt._cached()]
         if uncached_elts and pull:
             self._context.messenger.info("Attempting to fetch missing or incomplete artifact")
@@ -699,6 +744,8 @@
             targets, selection=selection, connect_artifact_cache=True, load_artifacts=True
         )
 
+        self.query_cache(target_objects)
+
         if self._artifacts.has_fetch_remotes():
             self._resolve_cached_remotely(target_objects)
 
@@ -718,6 +765,8 @@
         # Return list of Element and/or ArtifactElement objects
         target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
 
+        self.query_cache(target_objects)
+
         artifact_logs = {}
         for obj in target_objects:
             ref = obj.get_artifact_name()
@@ -746,6 +795,8 @@
         # Return list of Element and/or ArtifactElement objects
         target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
 
+        self.query_cache(target_objects)
+
         elements_to_files = {}
         for obj in target_objects:
             ref = obj.get_artifact_name()
@@ -835,6 +886,7 @@
         )
 
         # Assert all sources are cached in the source dir
+        self.query_cache(elements, sources=True)
         self._fetch(elements)
         _pipeline.assert_sources_cached(self._context, elements)
 
@@ -885,6 +937,7 @@
         # If we're going to checkout, we need at least a fetch,
         #
         if not no_checkout:
+            self.query_cache(elements, sources=True)
             self._fetch(elements, fetch_original=True)
 
         expanded_directories = []
@@ -1547,6 +1600,8 @@
             for element in artifacts:
                 element._set_required(_Scope.NONE)
 
+            self.query_cache(artifacts)
+
             self._reset()
             self._add_queue(PullQueue(self._scheduler))
             self._enqueue_plan(artifacts)
@@ -1578,7 +1633,6 @@
 
         # Now move on to loading primary selection.
         #
-        self._resolve_elements(self.targets)
         selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False)
         selected = _pipeline.except_elements(self.targets, selected, except_elements)
 
@@ -1594,40 +1648,6 @@
 
         return selected
 
-    # _resolve_elements()
-    #
-    # Resolve element state and cache keys.
-    #
-    # Args:
-    #    targets (list of Element): The list of toplevel element targets
-    #
-    def _resolve_elements(self, targets):
-        with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task:
-            # We need to go through the project to access the loader
-            #
-            # FIXME: We need to calculate the total elements to resolve differently so that
-            #        it can include artifact elements
-            #
-            if task and self._project:
-                task.set_maximum_progress(self._project.loader.loaded)
-
-            # XXX: Now that Element._update_state() can trigger recursive update_state calls
-            # it is possible that we could get a RecursionError. However, this is unlikely
-            # to happen, even for large projects (tested with the Debian stack). Although,
-            # if it does become a problem we may have to set the recursion limit to a
-            # greater value.
-            for element in _pipeline.dependencies(targets, _Scope.ALL):
-                # Determine initial element state.
-                element._initialize_state()
-
-                # We may already have Elements which are cached and have their runtimes
-                # cached, if this is the case, we should immediately notify their reverse
-                # dependencies.
-                element._update_ready_for_runtime_and_cached()
-
-                if task:
-                    task.add_current_progress()
-
     # _reset()
     #
     # Resets the internal state related to a given scheduler run.
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 503361e..6a296bf 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -226,7 +226,7 @@
         load_element: "LoadElement",
         plugin_conf: Dict[str, Any],
         *,
-        artifact: Artifact = None,
+        artifact_key: str = None,
     ):
 
         self.__cache_key_dict = None  # Dict for cache key calculation
@@ -290,7 +290,7 @@
         self.__sourcecache = context.sourcecache  # Source cache
         self.__assemble_scheduled = False  # Element is scheduled to be assembled
         self.__assemble_done = False  # Element is assembled
-        self.__pull_done = False  # Whether pull was attempted
+        self.__pull_pending = False  # Whether pull is pending
         self.__cached_successfully = None  # If the Element is known to be successfully cached
         self.__splits = None  # Resolved regex objects for computing split domains
         self.__whitelist_regex = None  # Resolved regex object to check if file is allowed to overlap
@@ -318,8 +318,8 @@
         self.__environment: Dict[str, str] = {}
         self.__variables: Optional[Variables] = None
 
-        if artifact:
-            self.__initialize_from_artifact(artifact)
+        if artifact_key:
+            self.__initialize_from_artifact_key(artifact_key)
         else:
             self.__initialize_from_yaml(load_element, plugin_conf)
 
@@ -1135,6 +1135,8 @@
 
         element.__preflight()
 
+        element._initialize_state()
+
         if task:
             task.add_current_progress()
 
@@ -1182,9 +1184,6 @@
     #            the artifact cache
     #
     def _cached(self):
-        if not self.__artifact:
-            return False
-
         return self.__artifact.cached()
 
     # _cached_remotely():
@@ -1300,7 +1299,17 @@
     #
     def _can_query_cache(self):
         # cache cannot be queried until strict cache key is available
-        return self.__strict_cache_key is not None
+        return self.__artifact is not None
+
+    # _can_query_source_cache():
+    #
+    # Returns whether the source cache status is available.
+    #
+    # Returns:
+    #    (bool): True if source cache can be queried
+    #
+    def _can_query_source_cache(self):
+        return self.__sources.can_query_cache()
 
     # _initialize_state()
     #
@@ -1328,9 +1337,6 @@
     #
     # - __update_cache_keys()
     #   - Computes the strong and weak cache keys.
-    # - _update_artifact_state()
-    #   - Computes the state of the element's artifact using the
-    #     cache key.
     # - __schedule_assembly_when_necessary()
     #   - Schedules assembly of an element, iff its current state
     #     allows/necessitates it
@@ -1588,7 +1594,9 @@
     def __should_schedule(self):
         # We're processing if we're already scheduled, we've
         # finished assembling or if we're waiting to pull.
-        processing = self.__assemble_scheduled or self.__assemble_done or self._pull_pending()
+        processing = (
+            self.__assemble_scheduled or self.__assemble_done or (self._can_query_cache() and self._pull_pending())
+        )
 
         # We should schedule a build when
         return (
@@ -1654,7 +1662,7 @@
             self.__artifact.set_cached()
             self.__cached_successfully = True
         else:
-            self.__artifact.reset_cached()
+            self.__artifact.query_cache()
 
         # When we're building in non-strict mode, we may have
         # assembled everything to this point without a strong cache
@@ -1863,83 +1871,91 @@
     #   (bool): Whether a pull operation is pending
     #
     def _pull_pending(self):
-        if self._get_workspace():
-            # Workspace builds are never pushed to artifact servers
-            return False
+        return self.__pull_pending
 
-        # Check whether the pull has been invoked with a specific subdir requested
-        # in user context, as to complete a partial artifact
-        pull_buildtrees = self._get_context().pull_buildtrees
-
-        if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
-            if pull_buildtrees:
-                # If we've specified a subdir, check if the subdir is cached locally
-                # or if it's possible to get
-                if self._cached_buildtree() or not self._buildtree_exists():
-                    return False
-            else:
-                return False
-
-        # Pull is pending if artifact remote server available
-        # and pull has not been attempted yet
-        return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
-
-    # _pull_done()
+    # _load_artifact_done()
     #
-    # Indicate that pull was attempted.
+    # Indicate that `_load_artifact()` has completed.
     #
-    # This needs to be called in the main process after a pull
+    # This needs to be called in the main process after `_load_artifact()`
     # succeeds or fails so that we properly update the main
     # process data model
     #
     # This will result in updating the element state.
     #
-    def _pull_done(self):
+    def _load_artifact_done(self):
         assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
 
-        self.__pull_done = True
+        assert self.__artifact
 
-        # Artifact may become cached after pulling, so let it query the
-        # filesystem again to check
-        self.__artifact.reset_cached()
+        context = self._get_context()
 
-        # We may not have actually pulled an artifact - the pull may
-        # have failed. We might therefore need to schedule assembly.
-        self.__schedule_assembly_when_necessary()
-        # If we've finished pulling, an artifact might now exist
-        # locally, so we might need to update a non-strict strong
-        # cache key.
-        self.__update_cache_key_non_strict()
+        if not context.get_strict() and self.__artifact.cached():
+            # In non-strict mode, strong cache key becomes available when
+            # the artifact is cached
+            self.__update_cache_key_non_strict()
+
         self._update_ready_for_runtime_and_cached()
 
-    # _pull():
+        self.__schedule_assembly_when_necessary()
+
+        if self.__can_query_cache_callback is not None:
+            self.__can_query_cache_callback(self)
+            self.__can_query_cache_callback = None
+
+    # _load_artifact():
     #
-    # Pull artifact from remote artifact repository into local artifact cache.
+    # Load artifact from cache or pull it from remote artifact repository.
     #
     # Returns: True if the artifact has been downloaded, False otherwise
     #
-    def _pull(self):
+    def _load_artifact(self, *, pull, strict=None):
         context = self._get_context()
 
-        # Get optional specific subdir to pull and optional list to not pull
-        # based off of user context
-        pull_buildtrees = context.pull_buildtrees
+        if strict is None:
+            strict = context.get_strict()
 
-        # Attempt to pull artifact without knowing whether it's available
-        strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
-        if strict_artifact.pull(pull_buildtrees=pull_buildtrees):
-            # Notify successful download
-            return True
+        pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
 
-        if not context.get_strict() and not self._cached():
-            # In non-strict mode also try pulling weak artifact
-            # if no weak artifact is cached yet.
-            artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
-            return artifact.pull(pull_buildtrees=pull_buildtrees)
-        else:
-            # No artifact has been downloaded
+        # First check whether we already have the strict artifact in the local cache
+        artifact = Artifact(
+            self,
+            context,
+            strict_key=self.__strict_cache_key,
+            strong_key=self.__strict_cache_key,
+            weak_key=self.__weak_cache_key,
+        )
+        artifact.query_cache()
+
+        self.__pull_pending = False
+        if not pull and not artifact.cached(buildtree=pull_buildtrees):
+            if self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace():
+                # Artifact is not completely available in cache and artifact remote server is available.
+                # Stop artifact loading here as pull is required to proceed.
+                self.__pull_pending = True
+
+        # Attempt to pull artifact with the strict cache key
+        pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+        if artifact.cached() or strict:
+            self.__artifact = artifact
+            return pulled
+        elif self.__pull_pending:
             return False
 
+        # In non-strict mode retry with weak cache key
+        artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
+        artifact.query_cache()
+
+        # Attempt to pull artifact with the weak cache key
+        pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+        self.__artifact = artifact
+        return pulled
+
+    def _query_source_cache(self):
+        self.__sources.query_cache()
+
     def _skip_source_push(self):
         if not self.sources() or self._get_workspace():
             return True
@@ -2378,7 +2394,7 @@
         assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
 
         if not self.__ready_for_runtime_and_cached:
-            if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
+            if self.__runtime_deps_uncached == 0 and self.__artifact and self.__cache_key and self._cached_success():
                 self.__ready_for_runtime_and_cached = True
 
                 # Notify reverse dependencies
@@ -2595,6 +2611,7 @@
             return None
 
         artifact = Artifact(self, self._get_context(), strong_key=workspace.last_build)
+        artifact.query_cache()
 
         if not artifact.cached():
             return None
@@ -2862,13 +2879,33 @@
         self.__variables.expand(sandbox_config)
         self.__sandbox_config = SandboxConfig.new_from_node(sandbox_config, platform=context.platform)
 
-    # __initialize_from_artifact()
+    # __initialize_from_artifact_key()
     #
-    # Initialize the element state from an Artifact object
+    # Initialize the element state from an artifact key
     #
-    def __initialize_from_artifact(self, artifact: Artifact):
-        self.__artifact = artifact
-        self._mimic_artifact()
+    def __initialize_from_artifact_key(self, key: str):
+        # At this point we only know the key which was specified on the command line,
+        # so we will pretend all keys are equal.
+        #
+        # If the artifact is cached, then the real keys will be loaded from the
+        # artifact in `_load_artifact()` and `_load_artifact_done()`.
+        #
+        self.__cache_key = key
+        self.__strict_cache_key = key
+        self.__weak_cache_key = key
+
+        self._initialize_state()
+
+        # ArtifactElement requires access to the artifact early on to walk
+        # dependencies.
+        self._load_artifact(pull=False)
+
+        if not self._cached():
+            # Remotes are not initialized when artifact elements are loaded.
+            # Always consider pull pending if the artifact is not cached.
+            self.__pull_pending = True
+        else:
+            self._load_artifact_done()
 
     @classmethod
     def __compose_default_splits(cls, project, defaults, first_pass):
@@ -3244,59 +3281,10 @@
             # In strict mode, the strong cache key always matches the strict cache key
             self.__cache_key = self.__strict_cache_key
 
-        # If we've newly calculated a cache key, our artifact's
-        # current state will also change - after all, we can now find
-        # a potential existing artifact.
-        self.__update_artifact_state()
-
         # Update the message kwargs in use for this plugin to dispatch messages with
         #
         self._message_kwargs["element_key"] = self._get_display_key()
 
-    # __update_artifact_state()
-    #
-    # Updates the data involved in knowing about the artifact corresponding
-    # to this element.
-    #
-    # If the state changes, this will subsequently call
-    # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
-    # possible.
-    #
-    # Element.__update_cache_keys() must be called before this to have
-    # meaningful results, because the element must know its cache key before
-    # it can check whether an artifact exists for that cache key.
-    #
-    def __update_artifact_state(self):
-        assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
-        assert self.__artifact is None
-
-        context = self._get_context()
-
-        strict_artifact = Artifact(
-            self,
-            context,
-            strong_key=self.__strict_cache_key,
-            strict_key=self.__strict_cache_key,
-            weak_key=self.__weak_cache_key,
-        )
-        if context.get_strict() or strict_artifact.cached():
-            self.__artifact = strict_artifact
-        else:
-            self.__artifact = Artifact(
-                self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
-            )
-
-        if not context.get_strict() and self.__artifact.cached():
-            # In non-strict mode, strong cache key becomes available when
-            # the artifact is cached
-            self.__update_cache_key_non_strict()
-
-        self.__schedule_assembly_when_necessary()
-
-        if self.__can_query_cache_callback is not None:
-            self.__can_query_cache_callback(self)
-            self.__can_query_cache_callback = None
-
     # __update_cache_key_non_strict()
     #
     # Calculates the strong cache key if it hasn't already been set.
@@ -3305,7 +3293,7 @@
     # strict cache key, so no work needs to be done.
     #
     # When buildstream is not run in strict mode, this requires the artifact
-    # state (as set in Element.__update_artifact_state()) to be set accordingly,
+    # state (as set in Element._load_artifact()) to be set accordingly,
     # as the cache key can be loaded from the cache (possibly pulling from
     # a remote cache).
     #
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 858065d..24a1ee1 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -6,7 +6,6 @@
 import pytest
 
 from buildstream import _yaml
-from buildstream.types import _Scope
 from buildstream._project import Project
 from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 from buildstream.testing import cli  # pylint: disable=unused-import
@@ -33,15 +32,12 @@
         # Create a local artifact cache handle
         artifactcache = context.artifactcache
 
-        # Ensure the element's artifact memeber is initialised
-        # This is duplicated from Pipeline.resolve_elements()
-        # as this test does not use the cli frontend.
-        for e in element._dependencies(_Scope.ALL):
-            e._initialize_state()
-
         # Initialize remotes
         context.initialize_remotes(True, True, None, None)
 
+        # Query local cache
+        element._load_artifact(pull=False)
+
         assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst"
         assert element._push(), "Push operation failed"
 
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64..6c8a4b7 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@
 def test_fetch_consistency_error(cli, datafiles):
     project = str(datafiles)
 
-    # When the error occurs outside of the scheduler at load time,
-    # then the SourceError is reported directly as the main error.
     result = cli.run(project=project, args=["source", "fetch", "error.bst"])
-    result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+    result.assert_main_error(ErrorDomain.STREAM, None)
+    result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
 
 
 @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@
     project = str(datafiles)
 
     result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
-    result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+    result.assert_main_error(ErrorDomain.STREAM, None)
+    result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
 
 
 @pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index a32c747..8919330 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -372,8 +372,8 @@
         #
         result = cli.run(project=project, args=["build", "target.bst"])
         result.assert_success()
-        assert result.get_pulled_elements() == ["target.bst"]
-        assert result.get_pushed_elements() == []
+        assert "target.bst" in result.get_pulled_elements()
+        assert "target.bst" not in result.get_pushed_elements()
 
         # Delete the artifact locally again.
         cli.remove_artifact_from_cache(project, "target.bst")
@@ -385,8 +385,8 @@
         )
         result = cli.run(project=project, args=["build", "target.bst"])
         result.assert_success()
-        assert result.get_pulled_elements() == ["target.bst"]
-        assert result.get_pushed_elements() == ["target.bst"]
+        assert "target.bst" in result.get_pulled_elements()
+        assert "target.bst" in result.get_pushed_elements()
 
 
 # Ensure that when an artifact's size exceeds available disk space
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index 3dd686d..950cd83 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@
 def test_track_consistency_error(cli, datafiles):
     project = str(datafiles)
 
-    # Track the element causing a consistency error
+    # Track the element causing a consistency error in `is_cached()`
     result = cli.run(project=project, args=["source", "track", "error.bst"])
-    result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+    # We expect tracking to succeed as `is_cached()` is not required for tracking.
+    result.assert_success()
 
 
 @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
 def test_track_consistency_bug(cli, datafiles):
     project = str(datafiles)
 
-    # Track the element causing an unhandled exception
+    # Track the element causing an unhandled exception in `is_cached()`
     result = cli.run(project=project, args=["source", "track", "bug.bst"])
 
-    # We expect BuildStream to fail gracefully, with no recorded exception.
-    result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+    # We expect tracking to succeed as `is_cached()` is not required for tracking.
+    result.assert_success()
 
 
 @pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 86dac0b..f37242f 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -74,7 +74,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -114,9 +114,9 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
 
             # check that we have the source in the cas now and it's not fetched
+            element._query_source_cache()
             assert element._cached_sources()
             assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
 
@@ -134,7 +134,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -152,7 +152,7 @@
 
             # Check that the source in both in the source dir and the local CAS
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert element._cached_sources()
 
 
@@ -168,7 +168,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -200,7 +200,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index edbcfdf..47e845b 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -86,7 +86,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements(["push.bst"])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -136,7 +136,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements(["push.bst"])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f058..6671c79 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -64,7 +64,7 @@
         # now check that the source is in the refs file, this is pretty messy but
         # seems to be the only way to get the sources?
         element = project.load_elements(["import-bin.bst"])[0]
-        element._initialize_state()
+        element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
         assert sourcecache.contains(source)
@@ -99,7 +99,7 @@
         sourcecache = context.sourcecache
 
         element = project.load_elements(["import-dev.bst"])[0]
-        element._initialize_state()
+        element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
 
@@ -133,9 +133,9 @@
         project.ensure_fully_loaded()
 
         element = project.load_elements(["import-dev.bst"])[0]
-        element._initialize_state()
 
         # check consistency of the source
+        element._query_source_cache()
         assert not element._cached_sources()
 
     res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d8..861e70c 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@
     result = cli.run(project=project, args=["source", "fetch", "target.bst"])
     result.assert_success()
 
-    # Track will encounter an inconsistent submodule without any ref
+    # Track to update to the offending commit
     result = cli.run(project=project, args=["source", "track", "target.bst"])
     result.assert_success()
 
+    # Fetch after track will encounter an inconsistent submodule without any ref
+    result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+    result.assert_success()
+
     # Assert that we are just fine without it, and emit a warning to the user.
     assert "Ignoring inconsistent submodule" in result.stderr