Merge pull request #1446 from wjt/patch-3

doc/source/using_config.rst: fix "it's" typos
diff --git a/NEWS b/NEWS
index 989d587..efbbd74 100644
--- a/NEWS
+++ b/NEWS
@@ -2,6 +2,11 @@
 (unreleased)
 ============
 
+
+==================
+buildstream 1.93.6
+==================
+
 Format
 ------
   o The `script` element no longer has a `layout` configuration directly, and now
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index d4a716f..da1e03a 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -540,7 +540,7 @@
 
         return dependency_refs
 
-    # cached():
+    # query_cache():
     #
     # Check whether the artifact corresponding to the stored cache key is
     # available. This also checks whether all required parts of the artifact
@@ -550,11 +550,7 @@
     # Returns:
     #     (bool): Whether artifact is in local cache
     #
-    def cached(self):
-
-        if self._cached is not None:
-            return self._cached
-
+    def query_cache(self):
         context = self._context
 
         artifact = self._load_proto()
@@ -587,6 +583,21 @@
         self._cached = True
         return True
 
+    # cached()
+    #
+    # Return whether the artifact is available in the local cache. This must
+    # be called after `query_cache()` or `set_cached()`.
+    #
+    # Returns:
+    #     (bool): Whether artifact is in local cache
+    #
+    def cached(self, *, buildtree=False):
+        assert self._cached is not None
+        ret = self._cached
+        if buildtree:
+            ret = ret and (self.cached_buildtree() or not self.buildtree_exists())
+        return ret
+
     # cached_logs()
     #
     # Check if the artifact is cached with log files.
@@ -600,15 +611,6 @@
         # If the artifact is cached, its log files are available as well.
         return self._element._cached()
 
-    # reset_cached()
-    #
-    # Allow the Artifact to query the filesystem to determine whether it
-    # is cached or not.
-    #
-    def reset_cached(self):
-        self._proto = None
-        self._cached = None
-
     # set_cached()
     #
     # Mark the artifact as cached without querying the filesystem.
diff --git a/src/buildstream/_artifactelement.py b/src/buildstream/_artifactelement.py
index 06e48c1..cf7f55d 100644
--- a/src/buildstream/_artifactelement.py
+++ b/src/buildstream/_artifactelement.py
@@ -51,17 +51,10 @@
     def __init__(self, context, ref):
         project_name, element_name, key = verify_artifact_ref(ref)
 
-        # At this point we only know the key which was specified on the command line,
-        # so we will pretend all keys are equal.
-        #
-        # If the artifact is cached, then the real keys will be loaded from the
-        # artifact instead.
-        #
-        artifact = Artifact(self, context, strong_key=key, strict_key=key, weak_key=key)
         project = ArtifactProject(project_name, context)
         load_element = LoadElement(Node.from_dict({}), element_name, project.loader)  # NOTE element has no .bst suffix
 
-        super().__init__(context, project, load_element, None, artifact=artifact)
+        super().__init__(context, project, load_element, None, artifact_key=key)
 
     ########################################################
     #                      Public API                      #
@@ -128,12 +121,17 @@
     #         Override internal Element methods            #
     ########################################################
 
-    # Once we've finished pulling an artifact, we assume the
-    # state of the pulled artifact.
+    def _load_artifact(self, *, pull, strict=None):  # pylint: disable=useless-super-delegation
+        # Always operate in strict mode as artifact key has been specified explicitly.
+        return super()._load_artifact(pull=pull, strict=True)
+
+    # Once we've finished loading an artifact, we assume the
+    # state of the loaded artifact. This is also used if the
+    # artifact is loaded after pulling.
     #
-    def _pull_done(self):
-        super()._pull_done()
+    def _load_artifact_done(self):
         self._mimic_artifact()
+        super()._load_artifact_done()
 
     ########################################################
     #         Implement Element abstract methods           #
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index a15d20e..d426ee6 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -211,6 +211,9 @@
     #    SourceError: If one of the element sources has an error
     #
     def fetch(self):
+        if self._cached is None:
+            self.query_cache()
+
         if self.cached():
             return
 
@@ -293,7 +296,7 @@
         length = min(len(key), context.log_key_length)
         return key[:length]
 
-    # cached():
+    # query_cache():
     #
     # Check if the element sources are cached in CAS, generating the source
     # cache keys if needed.
@@ -301,10 +304,7 @@
     # Returns:
     #    (bool): True if the element sources are cached
     #
-    def cached(self):
-        if self._cached is not None:
-            return self._cached
-
+    def query_cache(self):
         cas = self._context.get_cascache()
         elementsourcescache = self._elementsourcescache
 
@@ -321,6 +321,28 @@
         self._cached = True
         return True
 
+    # can_query_cache():
+    #
+    # Returns whether the cache status is available.
+    #
+    # Returns:
+    #    (bool): True if cache status is available
+    #
+    def can_query_cache(self):
+        return self._cached is not None
+
+    # cached()
+    #
+    # Return whether the element sources are cached in CAS. This must be
+    # called only when all sources are resolved.
+    #
+    # Returns:
+    #    (bool): True if the element sources are cached
+    #
+    def cached(self):
+        assert self._cached is not None
+        return self._cached
+
     # is_resolved():
     #
     # Get whether all sources of the element are resolved
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index 6a9a18b..f20d98f 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -606,6 +606,8 @@
 
         dependencies = app.stream.load_selection(elements, selection=deps, except_targets=except_)
 
+        app.stream.query_cache(dependencies)
+
         if order == "alpha":
             dependencies = sorted(dependencies)
 
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 99c9f42..589e548 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -357,10 +357,14 @@
                 else:
                     if element.get_kind() == "junction":
                         line = p.fmt_subst(line, "state", "junction", fg="magenta")
+                    elif not element._can_query_cache():
+                        line = p.fmt_subst(line, "state", "waiting", fg="blue")
                     elif element._cached_failure():
                         line = p.fmt_subst(line, "state", "failed", fg="red")
                     elif element._cached_success():
                         line = p.fmt_subst(line, "state", "cached", fg="magenta")
+                    elif not element._can_query_source_cache():
+                        line = p.fmt_subst(line, "state", "waiting", fg="blue")
                     elif element._fetch_needed():
                         line = p.fmt_subst(line, "state", "fetch needed", fg="red")
                     elif element._buildable():
diff --git a/src/buildstream/_loader/loadelement.pyx b/src/buildstream/_loader/loadelement.pyx
index 210869e..f69e138 100644
--- a/src/buildstream/_loader/loadelement.pyx
+++ b/src/buildstream/_loader/loadelement.pyx
@@ -286,7 +286,6 @@
             from ..element import Element
 
             element = Element._new_from_load_element(self)
-            element._initialize_state()
 
             # Custom error for link dependencies, since we don't completely
             # parse their dependencies we cannot rely on the built-in ElementError.
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 48fb1c4..df4df84 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -816,7 +816,6 @@
             )
 
         element = Element._new_from_load_element(load_element)
-        element._initialize_state()
 
         # Handle the case where a subproject has no ref
         #
@@ -830,6 +829,7 @@
 
         # Handle the case where a subproject needs to be fetched
         #
+        element._query_source_cache()
         if element._should_fetch():
             self.load_context.fetch_subprojects([element])
 
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 3bd98cd..edb79ec 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -25,7 +25,7 @@
 
 from . import _signals
 from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
 from ._state import State, Task
 
 
@@ -48,6 +48,13 @@
         self.start_time: datetime.datetime = start_time
 
 
+class _JobInfo:
+    def __init__(self, action_name: str, element_name: str, element_key: str) -> None:
+        self.action_name = action_name
+        self.element_name = element_name
+        self.element_key = element_key
+
+
 # _MessengerLocal
 #
 # Thread local storage for the messenger
@@ -56,13 +63,6 @@
     def __init__(self) -> None:
         super().__init__()
 
-        # The callback to call when propagating messages
-        #
-        # FIXME: The message handler is currently not strongly typed,
-        #        as it uses a kwarg, we cannot declare it with Callable.
-        #        We can use `Protocol` to strongly type this with python >= 3.8
-        self.message_handler = None
-
         # The open file handle for this task
         self.log_handle: Optional[TextIO] = None
 
@@ -72,6 +72,9 @@
         # Level of silent messages depth in this task
         self.silence_scope_depth: int = 0
 
+        # Job
+        self.job: Optional[_JobInfo] = None
+
 
 # Messenger()
 #
@@ -97,13 +100,24 @@
         # Thread local storage
         self._locals: _MessengerLocal = _MessengerLocal()
 
+        # The callback to call when propagating messages
+        #
+        # FIXME: The message handler is currently not strongly typed,
+        #        as it uses a kwarg, we cannot declare it with Callable.
+        #        We can use `Protocol` to strongly type this with python >= 3.8
+        self._message_handler = None
+
+    def setup_new_action_context(self, action_name: str, element_name: str, element_key: str) -> None:
+        self._locals.silence_scope_depth = 0
+        self._locals.job = _JobInfo(action_name, element_name, element_key)
+
     # set_message_handler()
     #
     # Sets the handler for any status messages propagated through
     # the messenger.
     #
     def set_message_handler(self, handler) -> None:
-        self._locals.message_handler = handler
+        self._message_handler = handler
 
     # set_state()
     #
@@ -137,12 +151,31 @@
         # If we are recording messages, dump a copy into the open log file.
         self._record_message(message)
 
+        # Always add the log filename automatically
+        message.logfile = self._locals.log_filename
+
+        is_silenced = self._silent_messages()
+        job = self._locals.job
+
+        if job is not None:
+            # Automatically add message information from the job context
+            message.action_name = job.action_name
+            message.task_element_name = job.element_name
+            message.task_element_key = job.element_key
+
+            # Don't forward LOG messages from jobs
+            if message.message_type == MessageType.LOG:
+                return
+
+            # Don't forward JOB messages if they are currently silent
+            if is_silenced and (message.message_type not in unconditional_messages):
+                return
+
         # Send it off to the log handler (can be the frontend,
         # or it can be the child task which will propagate
         # to the frontend)
-        assert self._locals.message_handler
-
-        self._locals.message_handler(message, is_silenced=self._silent_messages())
+        assert self._message_handler
+        self._message_handler(message, is_silenced=is_silenced)
 
     # status():
     #
@@ -362,7 +395,6 @@
         # Create the fully qualified logfile in the log directory,
         # appending the pid and .log extension at the end.
         self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
-        self._locals.silence_scope_depth = 0
 
         # Ensure the directory exists first
         directory = os.path.dirname(self._locals.log_filename)
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index e1e6dcf..802e52b 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -88,10 +88,7 @@
         return elements
 
     def plan() -> List[Element]:
-        # Keep locally cached elements in the plan if remote artifact cache is used
-        # to allow pulling artifact with strict cache key, if available.
-        plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes()
-        return _Planner().plan(targets, plan_cached)
+        return _Planner().plan(targets)
 
     # Work around python not having a switch statement; this is
     # much clearer than the if/elif/else block we used to have.
@@ -293,9 +290,8 @@
 # _Planner()
 #
 # An internal object used for constructing build plan
-# from a given resolved toplevel element, while considering what
-# parts need to be built depending on build only dependencies
-# being cached, and depth sorting for more efficient processing.
+# from a given resolved toplevel element, using depth
+# sorting for more efficient processing.
 #
 class _Planner:
     def __init__(self):
@@ -319,15 +315,13 @@
         for dep in element._dependencies(_Scope.RUN, recurse=False):
             self.plan_element(dep, depth)
 
-        # Dont try to plan builds of elements that are cached already
-        if not element._cached_success():
-            for dep in element._dependencies(_Scope.BUILD, recurse=False):
-                self.plan_element(dep, depth + 1)
+        for dep in element._dependencies(_Scope.BUILD, recurse=False):
+            self.plan_element(dep, depth + 1)
 
         self.depth_map[element] = depth
         self.visiting_elements.remove(element)
 
-    def plan(self, roots, plan_cached):
+    def plan(self, roots):
         for root in roots:
             self.plan_element(root, 0)
 
@@ -337,4 +331,4 @@
         for index, item in enumerate(depth_sorted):
             item[0]._set_depth(index)
 
-        return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
+        return [item[0] for item in depth_sorted]
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c875afe..08f1a66 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@
 import asyncio
 import datetime
 import itertools
-import multiprocessing
 import threading
 import traceback
 
@@ -32,7 +31,7 @@
 from ... import utils
 from ..._utils import terminate_thread
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
 from ...types import FastEnum
 from ..._signals import TerminateException
 
@@ -113,8 +112,6 @@
         #
         self._scheduler = scheduler  # The scheduler
         self._messenger = self._scheduler.context.messenger
-        self._pipe_r = None  # The read end of a pipe for message passing
-        self._listening = False  # Whether the parent is currently listening
         self._suspended = False  # Whether this job is currently suspended
         self._max_retries = max_retries  # Maximum number of automatic retries
         self._result = None  # Return value of child action in the parent
@@ -143,11 +140,7 @@
 
         assert not self._terminated, "Attempted to start process which was already terminated"
 
-        # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
         self._tries += 1
-        self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -164,7 +157,7 @@
         loop = asyncio.get_event_loop()
 
         async def execute():
-            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
             await self._parent_child_completed(ret_code)
 
         self._task = loop.create_task(execute())
@@ -178,9 +171,6 @@
     def terminate(self):
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
-        # Make sure there is no garbage on the pipe
-        self._parent_stop_listening()
-
         if self._task:
             self._child.terminate()
 
@@ -289,16 +279,6 @@
     #                  Local Private Methods              #
     #######################################################
 
-    # _parent_shutdown()
-    #
-    # Shuts down the Job on the parent side by reading any remaining
-    # messages on the message pipe and cleaning up any resources.
-    #
-    def _parent_shutdown(self):
-        # Make sure we've read everything we need and then stop listening
-        self._parent_process_pipe()
-        self._parent_stop_listening()
-
     # _parent_child_completed()
     #
     # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -307,8 +287,6 @@
     #    returncode (int): The return code of the child process
     #
     async def _parent_child_completed(self, returncode):
-        self._parent_shutdown()
-
         try:
             returncode = _ReturnCode(returncode)
         except ValueError:
@@ -347,51 +325,7 @@
 
         self.parent_complete(status, self._result)
         self._scheduler.job_completed(self, status)
-
-        # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
-        self._pipe_r = self._task = None
-
-    # _parent_process_pipe()
-    #
-    # Reads back message envelopes from the message pipe
-    # in the parent process.
-    #
-    def _parent_process_pipe(self):
-        while self._pipe_r.poll():
-            try:
-                message = self._pipe_r.recv()
-            except EOFError:
-                self._parent_stop_listening()
-                break
-
-            self._messenger.message(message)
-
-    # _parent_recv()
-    #
-    # A callback to handle I/O events from the message
-    # pipe file descriptor in the main process message loop
-    #
-    def _parent_recv(self, *args):
-        self._parent_process_pipe()
-
-    # _parent_start_listening()
-    #
-    # Starts listening on the message pipe
-    #
-    def _parent_start_listening(self):
-        if not self._listening:
-            self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
-            self._listening = True
-
-    # _parent_stop_listening()
-    #
-    # Stops listening on the message pipe
-    #
-    def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._pipe_r.fileno())
-            self._listening = False
+        self._task = None
 
 
 # ChildJob()
@@ -432,7 +366,6 @@
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
-        self._pipe_w = None  # The write end of a pipe for message passing
         self._thread_id = None  # Thread in which the child executes its action
         self._should_terminate = False
         self._terminate_lock = threading.Lock()
@@ -484,16 +417,12 @@
     #
     # Perform the action in the child process, this calls the action_cb.
     #
-    # Args:
-    #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
-    #
-    def child_action(self, pipe_w):
-        # Assign the pipe we passed across the process boundaries
-        #
+    def child_action(self):
         # Set the global message handler in this child
         # process to forward messages to the parent process
-        self._pipe_w = pipe_w
-        self._messenger.set_message_handler(self._child_message_handler)
+        self._messenger.setup_new_action_context(
+            self.action_name, self._message_element_name, self._message_element_key
+        )
 
         # Time, log and and run the action function
         #
@@ -571,8 +500,6 @@
             except TerminateException:
                 self._thread_id = None
                 return _ReturnCode.TERMINATED, None
-            finally:
-                self._pipe_w.close()
 
     # terminate()
     #
@@ -592,36 +519,3 @@
                 return
 
         terminate_thread(self._thread_id)
-
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
-
-    # _child_message_handler()
-    #
-    # A Context delegate for handling messages, this replaces the
-    # frontend's main message handler in the context of a child task
-    # and performs local logging to the local log file before sending
-    # the message back to the parent process for further propagation.
-    # The related element display key is added to the message for
-    # widget rendering if not already set for an element childjob.
-    #
-    # Args:
-    #    message     (Message): The message to log
-    #    is_silenced (bool)   : Whether messages are silenced
-    #
-    def _child_message_handler(self, message, is_silenced):
-
-        message.action_name = self.action_name
-        message.task_element_name = self._message_element_name
-        message.task_element_key = self._message_element_key
-
-        # Send to frontend if appropriate
-        if is_silenced and (message.message_type not in unconditional_messages):
-            return
-
-        # Don't bother propagating these to the frontend
-        if message.message_type == MessageType.LOG:
-            return
-
-        self._pipe_w.send(message)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 18bf392..ee84982 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -50,13 +50,13 @@
             if not element._can_query_cache():
                 return QueueStatus.PENDING
 
-            if element._cached():
+            if element._cached_success():
                 return QueueStatus.SKIP
 
         # This will automatically skip elements which
         # have no sources.
 
-        if not element._should_fetch(self._should_fetch_original):
+        if element._can_query_source_cache() and not element._should_fetch(self._should_fetch_original):
             return QueueStatus.SKIP
 
         return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d6959..925ded0 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -21,7 +21,6 @@
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
-from ..jobs import JobStatus
 from ..._exceptions import SkipJob
 
 
@@ -37,28 +36,15 @@
         return PullQueue._pull_or_skip
 
     def status(self, element):
-        if not element._can_query_cache():
-            return QueueStatus.PENDING
-
         if element._pull_pending():
             return QueueStatus.READY
         else:
             return QueueStatus.SKIP
 
     def done(self, _, element, result, status):
-
-        if status is JobStatus.FAIL:
-            return
-
-        element._pull_done()
-
-    def register_pending_element(self, element):
-        # Set a "can_query_cache"_callback for an element which is not
-        # immediately ready to query the artifact cache so that it
-        # may be pulled.
-        element._set_can_query_cache_callback(self._enqueue_element)
+        element._load_artifact_done()
 
     @staticmethod
     def _pull_or_skip(element):
-        if not element._pull():
+        if not element._load_artifact(pull=True):
             raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a44d613..2645171 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -51,7 +51,8 @@
 from ._state import State
 from .types import _KeyStrength, _PipelineSelection, _Scope, _HostMount
 from .plugin import Plugin
-from . import utils, _yaml, _site, _pipeline
+from . import utils, node, _yaml, _site, _pipeline
+from .downloadablefilesource import DownloadableFileSource
 
 
 # Stream()
@@ -115,6 +116,16 @@
         # Reset the element loader state
         Element._reset_load_state()
 
+        # Reset global state in node.pyx, this is for the sake of
+        # test isolation.
+        node._reset_global_state()
+
+        # Ensure that any global state loaded by the downloadablefilesource
+        # is discarded in between sessions (different invocations of the CLI
+        # may come with different local state such as .netrc files, so we need
+        # a reset here).
+        DownloadableFileSource._reset_url_opener()
+
     # set_project()
     #
     # Set the top-level project.
@@ -177,6 +188,35 @@
             )
             return target_objects
 
+    # query_cache()
+    #
+    # Query the artifact and source caches to determine the cache status
+    # of the specified elements.
+    #
+    # Args:
+    #    elements (list of Element): The elements to check
+    #    sources (bool): True to only query the source cache
+    #
+    def query_cache(self, elements, *, sources=False):
+        with self._context.messenger.timed_activity("Query cache", silent_nested=True):
+            # Enqueue complete build plan as this is required to determine `buildable` status.
+            plan = list(_pipeline.dependencies(elements, _Scope.ALL))
+
+            for element in plan:
+                if element._can_query_cache():
+                    # Cache status already available.
+                    # This is the case for artifact elements, which load the
+                    # artifact early on.
+                    pass
+                elif not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+                    element._load_artifact(pull=False)
+                    if not element._can_query_cache() or not element._cached_success():
+                        element._query_source_cache()
+                    if not element._pull_pending():
+                        element._load_artifact_done()
+                elif element._has_all_sources_resolved():
+                    element._query_source_cache()
+
     # shell()
     #
     # Run a shell
@@ -241,6 +281,8 @@
             element = self.targets[0]
             element._set_required(scope)
 
+            self.query_cache([element] + elements)
+
             if pull_:
                 self._reset()
                 self._add_queue(PullQueue(self._scheduler))
@@ -280,6 +322,7 @@
 
         # Ensure we have our sources if we are launching a build shell
         if scope == _Scope.BUILD and not usebuildtree:
+            self.query_cache([element], sources=True)
             self._fetch([element])
             _pipeline.assert_sources_cached(self._context, [element])
 
@@ -342,6 +385,8 @@
                 for element in self.targets:
                     element._set_artifact_files_required(scope=scope)
 
+        self.query_cache(elements)
+
         # Now construct the queues
         #
         self._reset()
@@ -393,6 +438,8 @@
             ignore_project_source_remotes=ignore_project_source_remotes,
         )
 
+        self.query_cache(elements, sources=True)
+
         # Delegated to a shared fetch method
         self._fetch(elements, announce_session=True)
 
@@ -465,6 +512,8 @@
             ignore_project_source_remotes=ignore_project_source_remotes,
         )
 
+        self.query_cache(elements, sources=True)
+
         if not self._sourcecache.has_push_remotes():
             raise StreamError("No source caches available for pushing sources")
 
@@ -513,6 +562,9 @@
             raise StreamError("No artifact caches available for pulling artifacts")
 
         _pipeline.assert_consistent(self._context, elements)
+
+        self.query_cache(elements)
+
         self._reset()
         self._add_queue(PullQueue(self._scheduler))
         self._enqueue_plan(elements)
@@ -559,6 +611,8 @@
 
         _pipeline.assert_consistent(self._context, elements)
 
+        self.query_cache(elements)
+
         self._reset()
         self._add_queue(PullQueue(self._scheduler))
         self._add_queue(ArtifactPushQueue(self._scheduler))
@@ -621,6 +675,8 @@
 
         self._check_location_writable(location, force=force, tar=tar)
 
+        self.query_cache(elements)
+
         uncached_elts = [elt for elt in elements if not elt._cached()]
         if uncached_elts and pull:
             self._context.messenger.info("Attempting to fetch missing or incomplete artifact")
@@ -699,6 +755,8 @@
             targets, selection=selection, connect_artifact_cache=True, load_artifacts=True
         )
 
+        self.query_cache(target_objects)
+
         if self._artifacts.has_fetch_remotes():
             self._resolve_cached_remotely(target_objects)
 
@@ -718,6 +776,8 @@
         # Return list of Element and/or ArtifactElement objects
         target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
 
+        self.query_cache(target_objects)
+
         artifact_logs = {}
         for obj in target_objects:
             ref = obj.get_artifact_name()
@@ -746,6 +806,8 @@
         # Return list of Element and/or ArtifactElement objects
         target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
 
+        self.query_cache(target_objects)
+
         elements_to_files = {}
         for obj in target_objects:
             ref = obj.get_artifact_name()
@@ -835,6 +897,7 @@
         )
 
         # Assert all sources are cached in the source dir
+        self.query_cache(elements, sources=True)
         self._fetch(elements)
         _pipeline.assert_sources_cached(self._context, elements)
 
@@ -885,6 +948,7 @@
         # If we're going to checkout, we need at least a fetch,
         #
         if not no_checkout:
+            self.query_cache(elements, sources=True)
             self._fetch(elements, fetch_original=True)
 
         expanded_directories = []
@@ -1547,6 +1611,8 @@
             for element in artifacts:
                 element._set_required(_Scope.NONE)
 
+            self.query_cache(artifacts)
+
             self._reset()
             self._add_queue(PullQueue(self._scheduler))
             self._enqueue_plan(artifacts)
@@ -1578,7 +1644,6 @@
 
         # Now move on to loading primary selection.
         #
-        self._resolve_elements(self.targets)
         selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False)
         selected = _pipeline.except_elements(self.targets, selected, except_elements)
 
@@ -1594,40 +1659,6 @@
 
         return selected
 
-    # _resolve_elements()
-    #
-    # Resolve element state and cache keys.
-    #
-    # Args:
-    #    targets (list of Element): The list of toplevel element targets
-    #
-    def _resolve_elements(self, targets):
-        with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task:
-            # We need to go through the project to access the loader
-            #
-            # FIXME: We need to calculate the total elements to resolve differently so that
-            #        it can include artifact elements
-            #
-            if task and self._project:
-                task.set_maximum_progress(self._project.loader.loaded)
-
-            # XXX: Now that Element._update_state() can trigger recursive update_state calls
-            # it is possible that we could get a RecursionError. However, this is unlikely
-            # to happen, even for large projects (tested with the Debian stack). Although,
-            # if it does become a problem we may have to set the recursion limit to a
-            # greater value.
-            for element in _pipeline.dependencies(targets, _Scope.ALL):
-                # Determine initial element state.
-                element._initialize_state()
-
-                # We may already have Elements which are cached and have their runtimes
-                # cached, if this is the case, we should immediately notify their reverse
-                # dependencies.
-                element._update_ready_for_runtime_and_cached()
-
-                if task:
-                    task.add_current_progress()
-
     # _reset()
     #
     # Resets the internal state related to a given scheduler run.
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 503361e..6a296bf 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -226,7 +226,7 @@
         load_element: "LoadElement",
         plugin_conf: Dict[str, Any],
         *,
-        artifact: Artifact = None,
+        artifact_key: str = None,
     ):
 
         self.__cache_key_dict = None  # Dict for cache key calculation
@@ -290,7 +290,7 @@
         self.__sourcecache = context.sourcecache  # Source cache
         self.__assemble_scheduled = False  # Element is scheduled to be assembled
         self.__assemble_done = False  # Element is assembled
-        self.__pull_done = False  # Whether pull was attempted
+        self.__pull_pending = False  # Whether pull is pending
         self.__cached_successfully = None  # If the Element is known to be successfully cached
         self.__splits = None  # Resolved regex objects for computing split domains
         self.__whitelist_regex = None  # Resolved regex object to check if file is allowed to overlap
@@ -318,8 +318,8 @@
         self.__environment: Dict[str, str] = {}
         self.__variables: Optional[Variables] = None
 
-        if artifact:
-            self.__initialize_from_artifact(artifact)
+        if artifact_key:
+            self.__initialize_from_artifact_key(artifact_key)
         else:
             self.__initialize_from_yaml(load_element, plugin_conf)
 
@@ -1135,6 +1135,8 @@
 
         element.__preflight()
 
+        element._initialize_state()
+
         if task:
             task.add_current_progress()
 
@@ -1182,9 +1184,6 @@
     #            the artifact cache
     #
     def _cached(self):
-        if not self.__artifact:
-            return False
-
         return self.__artifact.cached()
 
     # _cached_remotely():
@@ -1300,7 +1299,17 @@
     #
     def _can_query_cache(self):
         # cache cannot be queried until strict cache key is available
-        return self.__strict_cache_key is not None
+        return self.__artifact is not None
+
+    # _can_query_source_cache():
+    #
+    # Returns whether the source cache status is available.
+    #
+    # Returns:
+    #    (bool): True if source cache can be queried
+    #
+    def _can_query_source_cache(self):
+        return self.__sources.can_query_cache()
 
     # _initialize_state()
     #
@@ -1328,9 +1337,6 @@
     #
     # - __update_cache_keys()
     #   - Computes the strong and weak cache keys.
-    # - _update_artifact_state()
-    #   - Computes the state of the element's artifact using the
-    #     cache key.
     # - __schedule_assembly_when_necessary()
     #   - Schedules assembly of an element, iff its current state
     #     allows/necessitates it
@@ -1588,7 +1594,9 @@
     def __should_schedule(self):
         # We're processing if we're already scheduled, we've
         # finished assembling or if we're waiting to pull.
-        processing = self.__assemble_scheduled or self.__assemble_done or self._pull_pending()
+        processing = (
+            self.__assemble_scheduled or self.__assemble_done or (self._can_query_cache() and self._pull_pending())
+        )
 
         # We should schedule a build when
         return (
@@ -1654,7 +1662,7 @@
             self.__artifact.set_cached()
             self.__cached_successfully = True
         else:
-            self.__artifact.reset_cached()
+            self.__artifact.query_cache()
 
         # When we're building in non-strict mode, we may have
         # assembled everything to this point without a strong cache
@@ -1863,83 +1871,91 @@
     #   (bool): Whether a pull operation is pending
     #
     def _pull_pending(self):
-        if self._get_workspace():
-            # Workspace builds are never pushed to artifact servers
-            return False
+        return self.__pull_pending
 
-        # Check whether the pull has been invoked with a specific subdir requested
-        # in user context, as to complete a partial artifact
-        pull_buildtrees = self._get_context().pull_buildtrees
-
-        if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
-            if pull_buildtrees:
-                # If we've specified a subdir, check if the subdir is cached locally
-                # or if it's possible to get
-                if self._cached_buildtree() or not self._buildtree_exists():
-                    return False
-            else:
-                return False
-
-        # Pull is pending if artifact remote server available
-        # and pull has not been attempted yet
-        return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
-
-    # _pull_done()
+    # _load_artifact_done()
     #
-    # Indicate that pull was attempted.
+    # Indicate that `_load_artifact()` has completed.
     #
-    # This needs to be called in the main process after a pull
+    # This needs to be called in the main process after `_load_artifact()`
     # succeeds or fails so that we properly update the main
     # process data model
     #
     # This will result in updating the element state.
     #
-    def _pull_done(self):
+    def _load_artifact_done(self):
         assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
 
-        self.__pull_done = True
+        assert self.__artifact
 
-        # Artifact may become cached after pulling, so let it query the
-        # filesystem again to check
-        self.__artifact.reset_cached()
+        context = self._get_context()
 
-        # We may not have actually pulled an artifact - the pull may
-        # have failed. We might therefore need to schedule assembly.
-        self.__schedule_assembly_when_necessary()
-        # If we've finished pulling, an artifact might now exist
-        # locally, so we might need to update a non-strict strong
-        # cache key.
-        self.__update_cache_key_non_strict()
+        if not context.get_strict() and self.__artifact.cached():
+            # In non-strict mode, strong cache key becomes available when
+            # the artifact is cached
+            self.__update_cache_key_non_strict()
+
         self._update_ready_for_runtime_and_cached()
 
-    # _pull():
+        self.__schedule_assembly_when_necessary()
+
+        if self.__can_query_cache_callback is not None:
+            self.__can_query_cache_callback(self)
+            self.__can_query_cache_callback = None
+
+    # _load_artifact():
     #
-    # Pull artifact from remote artifact repository into local artifact cache.
+    # Load artifact from cache or pull it from remote artifact repository.
     #
     # Returns: True if the artifact has been downloaded, False otherwise
     #
-    def _pull(self):
+    def _load_artifact(self, *, pull, strict=None):
         context = self._get_context()
 
-        # Get optional specific subdir to pull and optional list to not pull
-        # based off of user context
-        pull_buildtrees = context.pull_buildtrees
+        if strict is None:
+            strict = context.get_strict()
 
-        # Attempt to pull artifact without knowing whether it's available
-        strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
-        if strict_artifact.pull(pull_buildtrees=pull_buildtrees):
-            # Notify successful download
-            return True
+        pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
 
-        if not context.get_strict() and not self._cached():
-            # In non-strict mode also try pulling weak artifact
-            # if no weak artifact is cached yet.
-            artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
-            return artifact.pull(pull_buildtrees=pull_buildtrees)
-        else:
-            # No artifact has been downloaded
+        # First check whether we already have the strict artifact in the local cache
+        artifact = Artifact(
+            self,
+            context,
+            strict_key=self.__strict_cache_key,
+            strong_key=self.__strict_cache_key,
+            weak_key=self.__weak_cache_key,
+        )
+        artifact.query_cache()
+
+        self.__pull_pending = False
+        if not pull and not artifact.cached(buildtree=pull_buildtrees):
+            if self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace():
+                # Artifact is not completely available in cache and artifact remote server is available.
+                # Stop artifact loading here as pull is required to proceed.
+                self.__pull_pending = True
+
+        # Attempt to pull artifact with the strict cache key
+        pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+        if artifact.cached() or strict:
+            self.__artifact = artifact
+            return pulled
+        elif self.__pull_pending:
             return False
 
+        # In non-strict mode retry with weak cache key
+        artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
+        artifact.query_cache()
+
+        # Attempt to pull artifact with the weak cache key
+        pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+        self.__artifact = artifact
+        return pulled
+
+    def _query_source_cache(self):
+        self.__sources.query_cache()
+
     def _skip_source_push(self):
         if not self.sources() or self._get_workspace():
             return True
@@ -2378,7 +2394,7 @@
         assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
 
         if not self.__ready_for_runtime_and_cached:
-            if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
+            if self.__runtime_deps_uncached == 0 and self.__artifact and self.__cache_key and self._cached_success():
                 self.__ready_for_runtime_and_cached = True
 
                 # Notify reverse dependencies
@@ -2595,6 +2611,7 @@
             return None
 
         artifact = Artifact(self, self._get_context(), strong_key=workspace.last_build)
+        artifact.query_cache()
 
         if not artifact.cached():
             return None
@@ -2862,13 +2879,33 @@
         self.__variables.expand(sandbox_config)
         self.__sandbox_config = SandboxConfig.new_from_node(sandbox_config, platform=context.platform)
 
-    # __initialize_from_artifact()
+    # __initialize_from_artifact_key()
     #
-    # Initialize the element state from an Artifact object
+    # Initialize the element state from an artifact key
     #
-    def __initialize_from_artifact(self, artifact: Artifact):
-        self.__artifact = artifact
-        self._mimic_artifact()
+    def __initialize_from_artifact_key(self, key: str):
+        # At this point we only know the key which was specified on the command line,
+        # so we will pretend all keys are equal.
+        #
+        # If the artifact is cached, then the real keys will be loaded from the
+        # artifact in `_load_artifact()` and `_load_artifact_done()`.
+        #
+        self.__cache_key = key
+        self.__strict_cache_key = key
+        self.__weak_cache_key = key
+
+        self._initialize_state()
+
+        # ArtifactElement requires access to the artifact early on to walk
+        # dependencies.
+        self._load_artifact(pull=False)
+
+        if not self._cached():
+            # Remotes are not initialized when artifact elements are loaded.
+            # Always consider pull pending if the artifact is not cached.
+            self.__pull_pending = True
+        else:
+            self._load_artifact_done()
 
     @classmethod
     def __compose_default_splits(cls, project, defaults, first_pass):
@@ -3244,59 +3281,10 @@
             # In strict mode, the strong cache key always matches the strict cache key
             self.__cache_key = self.__strict_cache_key
 
-        # If we've newly calculated a cache key, our artifact's
-        # current state will also change - after all, we can now find
-        # a potential existing artifact.
-        self.__update_artifact_state()
-
         # Update the message kwargs in use for this plugin to dispatch messages with
         #
         self._message_kwargs["element_key"] = self._get_display_key()
 
-    # __update_artifact_state()
-    #
-    # Updates the data involved in knowing about the artifact corresponding
-    # to this element.
-    #
-    # If the state changes, this will subsequently call
-    # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
-    # possible.
-    #
-    # Element.__update_cache_keys() must be called before this to have
-    # meaningful results, because the element must know its cache key before
-    # it can check whether an artifact exists for that cache key.
-    #
-    def __update_artifact_state(self):
-        assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
-        assert self.__artifact is None
-
-        context = self._get_context()
-
-        strict_artifact = Artifact(
-            self,
-            context,
-            strong_key=self.__strict_cache_key,
-            strict_key=self.__strict_cache_key,
-            weak_key=self.__weak_cache_key,
-        )
-        if context.get_strict() or strict_artifact.cached():
-            self.__artifact = strict_artifact
-        else:
-            self.__artifact = Artifact(
-                self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
-            )
-
-        if not context.get_strict() and self.__artifact.cached():
-            # In non-strict mode, strong cache key becomes available when
-            # the artifact is cached
-            self.__update_cache_key_non_strict()
-
-        self.__schedule_assembly_when_necessary()
-
-        if self.__can_query_cache_callback is not None:
-            self.__can_query_cache_callback(self)
-            self.__can_query_cache_callback = None
-
     # __update_cache_key_non_strict()
     #
     # Calculates the strong cache key if it hasn't already been set.
@@ -3305,7 +3293,7 @@
     # strict cache key, so no work needs to be done.
     #
     # When buildstream is not run in strict mode, this requires the artifact
-    # state (as set in Element.__update_artifact_state()) to be set accordingly,
+    # state (as set in Element._load_artifact()) to be set accordingly,
     # as the cache key can be loaded from the cache (possibly pulling from
     # a remote cache).
     #
diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py
index 095a5f6..927954d 100644
--- a/src/buildstream/testing/_fixtures.py
+++ b/src/buildstream/testing/_fixtures.py
@@ -21,8 +21,6 @@
 import psutil
 import pytest
 
-from buildstream import node, DownloadableFileSource
-
 
 # Number of seconds to wait for background threads to exit.
 _AWAIT_THREADS_TIMEOUT_SECONDS = 5
@@ -54,10 +52,3 @@
     assert has_no_unexpected_background_threads(default_thread_number)
     yield
     assert has_no_unexpected_background_threads(default_thread_number)
-
-
-# Reset global state in node.pyx to improve test isolation
-@pytest.fixture(autouse=True)
-def reset_global_node_state():
-    node._reset_global_state()
-    DownloadableFileSource._reset_url_opener()
diff --git a/src/buildstream/testing/_sourcetests/conftest.py b/src/buildstream/testing/_sourcetests/conftest.py
index 6790712..fead43a 100644
--- a/src/buildstream/testing/_sourcetests/conftest.py
+++ b/src/buildstream/testing/_sourcetests/conftest.py
@@ -14,4 +14,4 @@
 #  You should have received a copy of the GNU Lesser General Public
 #  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 
-from .._fixtures import reset_global_node_state, default_thread_number, thread_check  # pylint: disable=unused-import
+from .._fixtures import default_thread_number, thread_check  # pylint: disable=unused-import
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 858065d..24a1ee1 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -6,7 +6,6 @@
 import pytest
 
 from buildstream import _yaml
-from buildstream.types import _Scope
 from buildstream._project import Project
 from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 from buildstream.testing import cli  # pylint: disable=unused-import
@@ -33,15 +32,12 @@
         # Create a local artifact cache handle
         artifactcache = context.artifactcache
 
-        # Ensure the element's artifact memeber is initialised
-        # This is duplicated from Pipeline.resolve_elements()
-        # as this test does not use the cli frontend.
-        for e in element._dependencies(_Scope.ALL):
-            e._initialize_state()
-
         # Initialize remotes
         context.initialize_remotes(True, True, None, None)
 
+        # Query local cache
+        element._load_artifact(pull=False)
+
         assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst"
         assert element._push(), "Push operation failed"
 
diff --git a/tests/conftest.py b/tests/conftest.py
index d79ad40..e3b13c9 100755
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -27,7 +27,6 @@
 from buildstream.testing import register_repo_kind, sourcetests_collection_hook
 from buildstream.testing._fixtures import (  # pylint: disable=unused-import
     default_thread_number,
-    reset_global_node_state,
     thread_check,
 )
 from buildstream.testing.integration import integration_cache  # pylint: disable=unused-import
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64..6c8a4b7 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@
 def test_fetch_consistency_error(cli, datafiles):
     project = str(datafiles)
 
-    # When the error occurs outside of the scheduler at load time,
-    # then the SourceError is reported directly as the main error.
     result = cli.run(project=project, args=["source", "fetch", "error.bst"])
-    result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+    result.assert_main_error(ErrorDomain.STREAM, None)
+    result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
 
 
 @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@
     project = str(datafiles)
 
     result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
-    result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+    result.assert_main_error(ErrorDomain.STREAM, None)
+    result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
 
 
 @pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index a32c747..8919330 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -372,8 +372,8 @@
         #
         result = cli.run(project=project, args=["build", "target.bst"])
         result.assert_success()
-        assert result.get_pulled_elements() == ["target.bst"]
-        assert result.get_pushed_elements() == []
+        assert "target.bst" in result.get_pulled_elements()
+        assert "target.bst" not in result.get_pushed_elements()
 
         # Delete the artifact locally again.
         cli.remove_artifact_from_cache(project, "target.bst")
@@ -385,8 +385,8 @@
         )
         result = cli.run(project=project, args=["build", "target.bst"])
         result.assert_success()
-        assert result.get_pulled_elements() == ["target.bst"]
-        assert result.get_pushed_elements() == ["target.bst"]
+        assert "target.bst" in result.get_pulled_elements()
+        assert "target.bst" in result.get_pushed_elements()
 
 
 # Ensure that when an artifact's size exceeds available disk space
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index 3dd686d..950cd83 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@
 def test_track_consistency_error(cli, datafiles):
     project = str(datafiles)
 
-    # Track the element causing a consistency error
+    # Track the element causing a consistency error in `is_cached()`
     result = cli.run(project=project, args=["source", "track", "error.bst"])
-    result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+    # We expect tracking to succeed as `is_cached()` is not required for tracking.
+    result.assert_success()
 
 
 @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
 def test_track_consistency_bug(cli, datafiles):
     project = str(datafiles)
 
-    # Track the element causing an unhandled exception
+    # Track the element causing an unhandled exception in `is_cached()`
     result = cli.run(project=project, args=["source", "track", "bug.bst"])
 
-    # We expect BuildStream to fail gracefully, with no recorded exception.
-    result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+    # We expect tracking to succeed as `is_cached()` is not required for tracking.
+    result.assert_success()
 
 
 @pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 86dac0b..26f808d 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -74,7 +74,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -114,9 +114,9 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
 
             # check that we have the source in the cas now and it's not fetched
+            element._query_source_cache()
             assert element._cached_sources()
             assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
 
@@ -134,7 +134,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -151,8 +151,11 @@
             assert ("SUCCESS Fetching from {}".format(repo.source_config(ref=ref)["url"])) in res.stderr
 
             # Check that the source in both in the source dir and the local CAS
+            project = Project(project_dir, context)
+            project.ensure_fully_loaded()
+
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert element._cached_sources()
 
 
@@ -168,7 +171,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -200,7 +203,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index edbcfdf..47e845b 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -86,7 +86,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements(["push.bst"])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -136,7 +136,7 @@
             project.ensure_fully_loaded()
 
             element = project.load_elements(["push.bst"])[0]
-            element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f058..6671c79 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -64,7 +64,7 @@
         # now check that the source is in the refs file, this is pretty messy but
         # seems to be the only way to get the sources?
         element = project.load_elements(["import-bin.bst"])[0]
-        element._initialize_state()
+        element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
         assert sourcecache.contains(source)
@@ -99,7 +99,7 @@
         sourcecache = context.sourcecache
 
         element = project.load_elements(["import-dev.bst"])[0]
-        element._initialize_state()
+        element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
 
@@ -133,9 +133,9 @@
         project.ensure_fully_loaded()
 
         element = project.load_elements(["import-dev.bst"])[0]
-        element._initialize_state()
 
         # check consistency of the source
+        element._query_source_cache()
         assert not element._cached_sources()
 
     res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d8..861e70c 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@
     result = cli.run(project=project, args=["source", "fetch", "target.bst"])
     result.assert_success()
 
-    # Track will encounter an inconsistent submodule without any ref
+    # Track to update to the offending commit
     result = cli.run(project=project, args=["source", "track", "target.bst"])
     result.assert_success()
 
+    # Fetch after track will encounter an inconsistent submodule without any ref
+    result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+    result.assert_success()
+
     # Assert that we are just fine without it, and emit a warning to the user.
     assert "Ignoring inconsistent submodule" in result.stderr