Merge pull request #1446 from wjt/patch-3
doc/source/using_config.rst: fix "it's" typos
diff --git a/NEWS b/NEWS
index 989d587..efbbd74 100644
--- a/NEWS
+++ b/NEWS
@@ -2,6 +2,11 @@
(unreleased)
============
+
+==================
+buildstream 1.93.6
+==================
+
Format
------
o The `script` element no longer has a `layout` configuration directly, and now
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index d4a716f..da1e03a 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -540,7 +540,7 @@
return dependency_refs
- # cached():
+ # query_cache():
#
# Check whether the artifact corresponding to the stored cache key is
# available. This also checks whether all required parts of the artifact
@@ -550,11 +550,7 @@
# Returns:
# (bool): Whether artifact is in local cache
#
- def cached(self):
-
- if self._cached is not None:
- return self._cached
-
+ def query_cache(self):
context = self._context
artifact = self._load_proto()
@@ -587,6 +583,21 @@
self._cached = True
return True
+ # cached()
+ #
+ # Return whether the artifact is available in the local cache. This must
+ # be called after `query_cache()` or `set_cached()`.
+ #
+ # Returns:
+ # (bool): Whether artifact is in local cache
+ #
+ def cached(self, *, buildtree=False):
+ assert self._cached is not None
+ ret = self._cached
+ if buildtree:
+ ret = ret and (self.cached_buildtree() or not self.buildtree_exists())
+ return ret
+
# cached_logs()
#
# Check if the artifact is cached with log files.
@@ -600,15 +611,6 @@
# If the artifact is cached, its log files are available as well.
return self._element._cached()
- # reset_cached()
- #
- # Allow the Artifact to query the filesystem to determine whether it
- # is cached or not.
- #
- def reset_cached(self):
- self._proto = None
- self._cached = None
-
# set_cached()
#
# Mark the artifact as cached without querying the filesystem.
diff --git a/src/buildstream/_artifactelement.py b/src/buildstream/_artifactelement.py
index 06e48c1..cf7f55d 100644
--- a/src/buildstream/_artifactelement.py
+++ b/src/buildstream/_artifactelement.py
@@ -51,17 +51,10 @@
def __init__(self, context, ref):
project_name, element_name, key = verify_artifact_ref(ref)
- # At this point we only know the key which was specified on the command line,
- # so we will pretend all keys are equal.
- #
- # If the artifact is cached, then the real keys will be loaded from the
- # artifact instead.
- #
- artifact = Artifact(self, context, strong_key=key, strict_key=key, weak_key=key)
project = ArtifactProject(project_name, context)
load_element = LoadElement(Node.from_dict({}), element_name, project.loader) # NOTE element has no .bst suffix
- super().__init__(context, project, load_element, None, artifact=artifact)
+ super().__init__(context, project, load_element, None, artifact_key=key)
########################################################
# Public API #
@@ -128,12 +121,17 @@
# Override internal Element methods #
########################################################
- # Once we've finished pulling an artifact, we assume the
- # state of the pulled artifact.
+ def _load_artifact(self, *, pull, strict=None): # pylint: disable=useless-super-delegation
+ # Always operate in strict mode as artifact key has been specified explicitly.
+ return super()._load_artifact(pull=pull, strict=True)
+
+ # Once we've finished loading an artifact, we assume the
+ # state of the loaded artifact. This is also used if the
+ # artifact is loaded after pulling.
#
- def _pull_done(self):
- super()._pull_done()
+ def _load_artifact_done(self):
self._mimic_artifact()
+ super()._load_artifact_done()
########################################################
# Implement Element abstract methods #
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index a15d20e..d426ee6 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -211,6 +211,9 @@
# SourceError: If one of the element sources has an error
#
def fetch(self):
+ if self._cached is None:
+ self.query_cache()
+
if self.cached():
return
@@ -293,7 +296,7 @@
length = min(len(key), context.log_key_length)
return key[:length]
- # cached():
+ # query_cache():
#
# Check if the element sources are cached in CAS, generating the source
# cache keys if needed.
@@ -301,10 +304,7 @@
# Returns:
# (bool): True if the element sources are cached
#
- def cached(self):
- if self._cached is not None:
- return self._cached
-
+ def query_cache(self):
cas = self._context.get_cascache()
elementsourcescache = self._elementsourcescache
@@ -321,6 +321,28 @@
self._cached = True
return True
+ # can_query_cache():
+ #
+ # Returns whether the cache status is available.
+ #
+ # Returns:
+ # (bool): True if cache status is available
+ #
+ def can_query_cache(self):
+ return self._cached is not None
+
+ # cached()
+ #
+ # Return whether the element sources are cached in CAS. This must be
+ # called only when all sources are resolved.
+ #
+ # Returns:
+ # (bool): True if the element sources are cached
+ #
+ def cached(self):
+ assert self._cached is not None
+ return self._cached
+
# is_resolved():
#
# Get whether all sources of the element are resolved
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index 6a9a18b..f20d98f 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -606,6 +606,8 @@
dependencies = app.stream.load_selection(elements, selection=deps, except_targets=except_)
+ app.stream.query_cache(dependencies)
+
if order == "alpha":
dependencies = sorted(dependencies)
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 99c9f42..589e548 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -357,10 +357,14 @@
else:
if element.get_kind() == "junction":
line = p.fmt_subst(line, "state", "junction", fg="magenta")
+ elif not element._can_query_cache():
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
elif element._cached_failure():
line = p.fmt_subst(line, "state", "failed", fg="red")
elif element._cached_success():
line = p.fmt_subst(line, "state", "cached", fg="magenta")
+ elif not element._can_query_source_cache():
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
elif element._fetch_needed():
line = p.fmt_subst(line, "state", "fetch needed", fg="red")
elif element._buildable():
diff --git a/src/buildstream/_loader/loadelement.pyx b/src/buildstream/_loader/loadelement.pyx
index 210869e..f69e138 100644
--- a/src/buildstream/_loader/loadelement.pyx
+++ b/src/buildstream/_loader/loadelement.pyx
@@ -286,7 +286,6 @@
from ..element import Element
element = Element._new_from_load_element(self)
- element._initialize_state()
# Custom error for link dependencies, since we don't completely
# parse their dependencies we cannot rely on the built-in ElementError.
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 48fb1c4..df4df84 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -816,7 +816,6 @@
)
element = Element._new_from_load_element(load_element)
- element._initialize_state()
# Handle the case where a subproject has no ref
#
@@ -830,6 +829,7 @@
# Handle the case where a subproject needs to be fetched
#
+ element._query_source_cache()
if element._should_fetch():
self.load_context.fetch_subprojects([element])
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 3bd98cd..edb79ec 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -25,7 +25,7 @@
from . import _signals
from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
from ._state import State, Task
@@ -48,6 +48,13 @@
self.start_time: datetime.datetime = start_time
+class _JobInfo:
+ def __init__(self, action_name: str, element_name: str, element_key: str) -> None:
+ self.action_name = action_name
+ self.element_name = element_name
+ self.element_key = element_key
+
+
# _MessengerLocal
#
# Thread local storage for the messenger
@@ -56,13 +63,6 @@
def __init__(self) -> None:
super().__init__()
- # The callback to call when propagating messages
- #
- # FIXME: The message handler is currently not strongly typed,
- # as it uses a kwarg, we cannot declare it with Callable.
- # We can use `Protocol` to strongly type this with python >= 3.8
- self.message_handler = None
-
# The open file handle for this task
self.log_handle: Optional[TextIO] = None
@@ -72,6 +72,9 @@
# Level of silent messages depth in this task
self.silence_scope_depth: int = 0
+ # Job
+ self.job: Optional[_JobInfo] = None
+
# Messenger()
#
@@ -97,13 +100,24 @@
# Thread local storage
self._locals: _MessengerLocal = _MessengerLocal()
+ # The callback to call when propagating messages
+ #
+ # FIXME: The message handler is currently not strongly typed,
+ # as it uses a kwarg, we cannot declare it with Callable.
+ # We can use `Protocol` to strongly type this with python >= 3.8
+ self._message_handler = None
+
+ def setup_new_action_context(self, action_name: str, element_name: str, element_key: str) -> None:
+ self._locals.silence_scope_depth = 0
+ self._locals.job = _JobInfo(action_name, element_name, element_key)
+
# set_message_handler()
#
# Sets the handler for any status messages propagated through
# the messenger.
#
def set_message_handler(self, handler) -> None:
- self._locals.message_handler = handler
+ self._message_handler = handler
# set_state()
#
@@ -137,12 +151,31 @@
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
+ # Always add the log filename automatically
+ message.logfile = self._locals.log_filename
+
+ is_silenced = self._silent_messages()
+ job = self._locals.job
+
+ if job is not None:
+ # Automatically add message information from the job context
+ message.action_name = job.action_name
+ message.task_element_name = job.element_name
+ message.task_element_key = job.element_key
+
+ # Don't forward LOG messages from jobs
+ if message.message_type == MessageType.LOG:
+ return
+
+ # Don't forward JOB messages if they are currently silent
+ if is_silenced and (message.message_type not in unconditional_messages):
+ return
+
# Send it off to the log handler (can be the frontend,
# or it can be the child task which will propagate
# to the frontend)
- assert self._locals.message_handler
-
- self._locals.message_handler(message, is_silenced=self._silent_messages())
+ assert self._message_handler
+ self._message_handler(message, is_silenced=is_silenced)
# status():
#
@@ -362,7 +395,6 @@
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
- self._locals.silence_scope_depth = 0
# Ensure the directory exists first
directory = os.path.dirname(self._locals.log_filename)
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index e1e6dcf..802e52b 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -88,10 +88,7 @@
return elements
def plan() -> List[Element]:
- # Keep locally cached elements in the plan if remote artifact cache is used
- # to allow pulling artifact with strict cache key, if available.
- plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes()
- return _Planner().plan(targets, plan_cached)
+ return _Planner().plan(targets)
# Work around python not having a switch statement; this is
# much clearer than the if/elif/else block we used to have.
@@ -293,9 +290,8 @@
# _Planner()
#
# An internal object used for constructing build plan
-# from a given resolved toplevel element, while considering what
-# parts need to be built depending on build only dependencies
-# being cached, and depth sorting for more efficient processing.
+# from a given resolved toplevel element, using depth
+# sorting for more efficient processing.
#
class _Planner:
def __init__(self):
@@ -319,15 +315,13 @@
for dep in element._dependencies(_Scope.RUN, recurse=False):
self.plan_element(dep, depth)
- # Dont try to plan builds of elements that are cached already
- if not element._cached_success():
- for dep in element._dependencies(_Scope.BUILD, recurse=False):
- self.plan_element(dep, depth + 1)
+ for dep in element._dependencies(_Scope.BUILD, recurse=False):
+ self.plan_element(dep, depth + 1)
self.depth_map[element] = depth
self.visiting_elements.remove(element)
- def plan(self, roots, plan_cached):
+ def plan(self, roots):
for root in roots:
self.plan_element(root, 0)
@@ -337,4 +331,4 @@
for index, item in enumerate(depth_sorted):
item[0]._set_depth(index)
- return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
+ return [item[0] for item in depth_sorted]
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c875afe..08f1a66 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@
import asyncio
import datetime
import itertools
-import multiprocessing
import threading
import traceback
@@ -32,7 +31,7 @@
from ... import utils
from ..._utils import terminate_thread
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
from ...types import FastEnum
from ..._signals import TerminateException
@@ -113,8 +112,6 @@
#
self._scheduler = scheduler # The scheduler
self._messenger = self._scheduler.context.messenger
- self._pipe_r = None # The read end of a pipe for message passing
- self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
@@ -143,11 +140,7 @@
assert not self._terminated, "Attempted to start process which was already terminated"
- # FIXME: remove this, this is not necessary when using asyncio
- self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
self._tries += 1
- self._parent_start_listening()
# FIXME: remove the parent/child separation, it's not needed anymore.
self._child = self.create_child_job( # pylint: disable=assignment-from-no-return
@@ -164,7 +157,7 @@
loop = asyncio.get_event_loop()
async def execute():
- ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+ ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
await self._parent_child_completed(ret_code)
self._task = loop.create_task(execute())
@@ -178,9 +171,6 @@
def terminate(self):
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
- # Make sure there is no garbage on the pipe
- self._parent_stop_listening()
-
if self._task:
self._child.terminate()
@@ -289,16 +279,6 @@
# Local Private Methods #
#######################################################
- # _parent_shutdown()
- #
- # Shuts down the Job on the parent side by reading any remaining
- # messages on the message pipe and cleaning up any resources.
- #
- def _parent_shutdown(self):
- # Make sure we've read everything we need and then stop listening
- self._parent_process_pipe()
- self._parent_stop_listening()
-
# _parent_child_completed()
#
# Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -307,8 +287,6 @@
# returncode (int): The return code of the child process
#
async def _parent_child_completed(self, returncode):
- self._parent_shutdown()
-
try:
returncode = _ReturnCode(returncode)
except ValueError:
@@ -347,51 +325,7 @@
self.parent_complete(status, self._result)
self._scheduler.job_completed(self, status)
-
- # Force the deletion of the pipe and process objects to try and clean up FDs
- self._pipe_r.close()
- self._pipe_r = self._task = None
-
- # _parent_process_pipe()
- #
- # Reads back message envelopes from the message pipe
- # in the parent process.
- #
- def _parent_process_pipe(self):
- while self._pipe_r.poll():
- try:
- message = self._pipe_r.recv()
- except EOFError:
- self._parent_stop_listening()
- break
-
- self._messenger.message(message)
-
- # _parent_recv()
- #
- # A callback to handle I/O events from the message
- # pipe file descriptor in the main process message loop
- #
- def _parent_recv(self, *args):
- self._parent_process_pipe()
-
- # _parent_start_listening()
- #
- # Starts listening on the message pipe
- #
- def _parent_start_listening(self):
- if not self._listening:
- self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
- self._listening = True
-
- # _parent_stop_listening()
- #
- # Stops listening on the message pipe
- #
- def _parent_stop_listening(self):
- if self._listening:
- self._scheduler.loop.remove_reader(self._pipe_r.fileno())
- self._listening = False
+ self._task = None
# ChildJob()
@@ -432,7 +366,6 @@
self._message_element_name = message_element_name
self._message_element_key = message_element_key
- self._pipe_w = None # The write end of a pipe for message passing
self._thread_id = None # Thread in which the child executes its action
self._should_terminate = False
self._terminate_lock = threading.Lock()
@@ -484,16 +417,12 @@
#
# Perform the action in the child process, this calls the action_cb.
#
- # Args:
- # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
- #
- def child_action(self, pipe_w):
- # Assign the pipe we passed across the process boundaries
- #
+ def child_action(self):
# Set the global message handler in this child
# process to forward messages to the parent process
- self._pipe_w = pipe_w
- self._messenger.set_message_handler(self._child_message_handler)
+ self._messenger.setup_new_action_context(
+ self.action_name, self._message_element_name, self._message_element_key
+ )
# Time, log and and run the action function
#
@@ -571,8 +500,6 @@
except TerminateException:
self._thread_id = None
return _ReturnCode.TERMINATED, None
- finally:
- self._pipe_w.close()
# terminate()
#
@@ -592,36 +519,3 @@
return
terminate_thread(self._thread_id)
-
- #######################################################
- # Local Private Methods #
- #######################################################
-
- # _child_message_handler()
- #
- # A Context delegate for handling messages, this replaces the
- # frontend's main message handler in the context of a child task
- # and performs local logging to the local log file before sending
- # the message back to the parent process for further propagation.
- # The related element display key is added to the message for
- # widget rendering if not already set for an element childjob.
- #
- # Args:
- # message (Message): The message to log
- # is_silenced (bool) : Whether messages are silenced
- #
- def _child_message_handler(self, message, is_silenced):
-
- message.action_name = self.action_name
- message.task_element_name = self._message_element_name
- message.task_element_key = self._message_element_key
-
- # Send to frontend if appropriate
- if is_silenced and (message.message_type not in unconditional_messages):
- return
-
- # Don't bother propagating these to the frontend
- if message.message_type == MessageType.LOG:
- return
-
- self._pipe_w.send(message)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 18bf392..ee84982 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -50,13 +50,13 @@
if not element._can_query_cache():
return QueueStatus.PENDING
- if element._cached():
+ if element._cached_success():
return QueueStatus.SKIP
# This will automatically skip elements which
# have no sources.
- if not element._should_fetch(self._should_fetch_original):
+ if element._can_query_source_cache() and not element._should_fetch(self._should_fetch_original):
return QueueStatus.SKIP
return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d6959..925ded0 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -21,7 +21,6 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
-from ..jobs import JobStatus
from ..._exceptions import SkipJob
@@ -37,28 +36,15 @@
return PullQueue._pull_or_skip
def status(self, element):
- if not element._can_query_cache():
- return QueueStatus.PENDING
-
if element._pull_pending():
return QueueStatus.READY
else:
return QueueStatus.SKIP
def done(self, _, element, result, status):
-
- if status is JobStatus.FAIL:
- return
-
- element._pull_done()
-
- def register_pending_element(self, element):
- # Set a "can_query_cache"_callback for an element which is not
- # immediately ready to query the artifact cache so that it
- # may be pulled.
- element._set_can_query_cache_callback(self._enqueue_element)
+ element._load_artifact_done()
@staticmethod
def _pull_or_skip(element):
- if not element._pull():
+ if not element._load_artifact(pull=True):
raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a44d613..2645171 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -51,7 +51,8 @@
from ._state import State
from .types import _KeyStrength, _PipelineSelection, _Scope, _HostMount
from .plugin import Plugin
-from . import utils, _yaml, _site, _pipeline
+from . import utils, node, _yaml, _site, _pipeline
+from .downloadablefilesource import DownloadableFileSource
# Stream()
@@ -115,6 +116,16 @@
# Reset the element loader state
Element._reset_load_state()
+ # Reset global state in node.pyx, this is for the sake of
+ # test isolation.
+ node._reset_global_state()
+
+ # Ensure that any global state loaded by the downloadablefilesource
+ # is discarded in between sessions (different invocations of the CLI
+ # may come with different local state such as .netrc files, so we need
+ # a reset here).
+ DownloadableFileSource._reset_url_opener()
+
# set_project()
#
# Set the top-level project.
@@ -177,6 +188,35 @@
)
return target_objects
+ # query_cache()
+ #
+ # Query the artifact and source caches to determine the cache status
+ # of the specified elements.
+ #
+ # Args:
+ # elements (list of Element): The elements to check
+ # sources (bool): True to only query the source cache
+ #
+ def query_cache(self, elements, *, sources=False):
+ with self._context.messenger.timed_activity("Query cache", silent_nested=True):
+ # Enqueue complete build plan as this is required to determine `buildable` status.
+ plan = list(_pipeline.dependencies(elements, _Scope.ALL))
+
+ for element in plan:
+ if element._can_query_cache():
+ # Cache status already available.
+ # This is the case for artifact elements, which load the
+ # artifact early on.
+ pass
+ elif not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+ element._load_artifact(pull=False)
+ if not element._can_query_cache() or not element._cached_success():
+ element._query_source_cache()
+ if not element._pull_pending():
+ element._load_artifact_done()
+ elif element._has_all_sources_resolved():
+ element._query_source_cache()
+
# shell()
#
# Run a shell
@@ -241,6 +281,8 @@
element = self.targets[0]
element._set_required(scope)
+ self.query_cache([element] + elements)
+
if pull_:
self._reset()
self._add_queue(PullQueue(self._scheduler))
@@ -280,6 +322,7 @@
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
+ self.query_cache([element], sources=True)
self._fetch([element])
_pipeline.assert_sources_cached(self._context, [element])
@@ -342,6 +385,8 @@
for element in self.targets:
element._set_artifact_files_required(scope=scope)
+ self.query_cache(elements)
+
# Now construct the queues
#
self._reset()
@@ -393,6 +438,8 @@
ignore_project_source_remotes=ignore_project_source_remotes,
)
+ self.query_cache(elements, sources=True)
+
# Delegated to a shared fetch method
self._fetch(elements, announce_session=True)
@@ -465,6 +512,8 @@
ignore_project_source_remotes=ignore_project_source_remotes,
)
+ self.query_cache(elements, sources=True)
+
if not self._sourcecache.has_push_remotes():
raise StreamError("No source caches available for pushing sources")
@@ -513,6 +562,9 @@
raise StreamError("No artifact caches available for pulling artifacts")
_pipeline.assert_consistent(self._context, elements)
+
+ self.query_cache(elements)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
@@ -559,6 +611,8 @@
_pipeline.assert_consistent(self._context, elements)
+ self.query_cache(elements)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._add_queue(ArtifactPushQueue(self._scheduler))
@@ -621,6 +675,8 @@
self._check_location_writable(location, force=force, tar=tar)
+ self.query_cache(elements)
+
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
self._context.messenger.info("Attempting to fetch missing or incomplete artifact")
@@ -699,6 +755,8 @@
targets, selection=selection, connect_artifact_cache=True, load_artifacts=True
)
+ self.query_cache(target_objects)
+
if self._artifacts.has_fetch_remotes():
self._resolve_cached_remotely(target_objects)
@@ -718,6 +776,8 @@
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
+ self.query_cache(target_objects)
+
artifact_logs = {}
for obj in target_objects:
ref = obj.get_artifact_name()
@@ -746,6 +806,8 @@
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
+ self.query_cache(target_objects)
+
elements_to_files = {}
for obj in target_objects:
ref = obj.get_artifact_name()
@@ -835,6 +897,7 @@
)
# Assert all sources are cached in the source dir
+ self.query_cache(elements, sources=True)
self._fetch(elements)
_pipeline.assert_sources_cached(self._context, elements)
@@ -885,6 +948,7 @@
# If we're going to checkout, we need at least a fetch,
#
if not no_checkout:
+ self.query_cache(elements, sources=True)
self._fetch(elements, fetch_original=True)
expanded_directories = []
@@ -1547,6 +1611,8 @@
for element in artifacts:
element._set_required(_Scope.NONE)
+ self.query_cache(artifacts)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(artifacts)
@@ -1578,7 +1644,6 @@
# Now move on to loading primary selection.
#
- self._resolve_elements(self.targets)
selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False)
selected = _pipeline.except_elements(self.targets, selected, except_elements)
@@ -1594,40 +1659,6 @@
return selected
- # _resolve_elements()
- #
- # Resolve element state and cache keys.
- #
- # Args:
- # targets (list of Element): The list of toplevel element targets
- #
- def _resolve_elements(self, targets):
- with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task:
- # We need to go through the project to access the loader
- #
- # FIXME: We need to calculate the total elements to resolve differently so that
- # it can include artifact elements
- #
- if task and self._project:
- task.set_maximum_progress(self._project.loader.loaded)
-
- # XXX: Now that Element._update_state() can trigger recursive update_state calls
- # it is possible that we could get a RecursionError. However, this is unlikely
- # to happen, even for large projects (tested with the Debian stack). Although,
- # if it does become a problem we may have to set the recursion limit to a
- # greater value.
- for element in _pipeline.dependencies(targets, _Scope.ALL):
- # Determine initial element state.
- element._initialize_state()
-
- # We may already have Elements which are cached and have their runtimes
- # cached, if this is the case, we should immediately notify their reverse
- # dependencies.
- element._update_ready_for_runtime_and_cached()
-
- if task:
- task.add_current_progress()
-
# _reset()
#
# Resets the internal state related to a given scheduler run.
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 503361e..6a296bf 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -226,7 +226,7 @@
load_element: "LoadElement",
plugin_conf: Dict[str, Any],
*,
- artifact: Artifact = None,
+ artifact_key: str = None,
):
self.__cache_key_dict = None # Dict for cache key calculation
@@ -290,7 +290,7 @@
self.__sourcecache = context.sourcecache # Source cache
self.__assemble_scheduled = False # Element is scheduled to be assembled
self.__assemble_done = False # Element is assembled
- self.__pull_done = False # Whether pull was attempted
+ self.__pull_pending = False # Whether pull is pending
self.__cached_successfully = None # If the Element is known to be successfully cached
self.__splits = None # Resolved regex objects for computing split domains
self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap
@@ -318,8 +318,8 @@
self.__environment: Dict[str, str] = {}
self.__variables: Optional[Variables] = None
- if artifact:
- self.__initialize_from_artifact(artifact)
+ if artifact_key:
+ self.__initialize_from_artifact_key(artifact_key)
else:
self.__initialize_from_yaml(load_element, plugin_conf)
@@ -1135,6 +1135,8 @@
element.__preflight()
+ element._initialize_state()
+
if task:
task.add_current_progress()
@@ -1182,9 +1184,6 @@
# the artifact cache
#
def _cached(self):
- if not self.__artifact:
- return False
-
return self.__artifact.cached()
# _cached_remotely():
@@ -1300,7 +1299,17 @@
#
def _can_query_cache(self):
# cache cannot be queried until strict cache key is available
- return self.__strict_cache_key is not None
+ return self.__artifact is not None
+
+ # _can_query_source_cache():
+ #
+ # Returns whether the source cache status is available.
+ #
+ # Returns:
+ # (bool): True if source cache can be queried
+ #
+ def _can_query_source_cache(self):
+ return self.__sources.can_query_cache()
# _initialize_state()
#
@@ -1328,9 +1337,6 @@
#
# - __update_cache_keys()
# - Computes the strong and weak cache keys.
- # - _update_artifact_state()
- # - Computes the state of the element's artifact using the
- # cache key.
# - __schedule_assembly_when_necessary()
# - Schedules assembly of an element, iff its current state
# allows/necessitates it
@@ -1588,7 +1594,9 @@
def __should_schedule(self):
# We're processing if we're already scheduled, we've
# finished assembling or if we're waiting to pull.
- processing = self.__assemble_scheduled or self.__assemble_done or self._pull_pending()
+ processing = (
+ self.__assemble_scheduled or self.__assemble_done or (self._can_query_cache() and self._pull_pending())
+ )
# We should schedule a build when
return (
@@ -1654,7 +1662,7 @@
self.__artifact.set_cached()
self.__cached_successfully = True
else:
- self.__artifact.reset_cached()
+ self.__artifact.query_cache()
# When we're building in non-strict mode, we may have
# assembled everything to this point without a strong cache
@@ -1863,83 +1871,91 @@
# (bool): Whether a pull operation is pending
#
def _pull_pending(self):
- if self._get_workspace():
- # Workspace builds are never pushed to artifact servers
- return False
+ return self.__pull_pending
- # Check whether the pull has been invoked with a specific subdir requested
- # in user context, as to complete a partial artifact
- pull_buildtrees = self._get_context().pull_buildtrees
-
- if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
- if pull_buildtrees:
- # If we've specified a subdir, check if the subdir is cached locally
- # or if it's possible to get
- if self._cached_buildtree() or not self._buildtree_exists():
- return False
- else:
- return False
-
- # Pull is pending if artifact remote server available
- # and pull has not been attempted yet
- return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
-
- # _pull_done()
+ # _load_artifact_done()
#
- # Indicate that pull was attempted.
+ # Indicate that `_load_artifact()` has completed.
#
- # This needs to be called in the main process after a pull
+ # This needs to be called in the main process after `_load_artifact()`
# succeeds or fails so that we properly update the main
# process data model
#
# This will result in updating the element state.
#
- def _pull_done(self):
+ def _load_artifact_done(self):
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
- self.__pull_done = True
+ assert self.__artifact
- # Artifact may become cached after pulling, so let it query the
- # filesystem again to check
- self.__artifact.reset_cached()
+ context = self._get_context()
- # We may not have actually pulled an artifact - the pull may
- # have failed. We might therefore need to schedule assembly.
- self.__schedule_assembly_when_necessary()
- # If we've finished pulling, an artifact might now exist
- # locally, so we might need to update a non-strict strong
- # cache key.
- self.__update_cache_key_non_strict()
+ if not context.get_strict() and self.__artifact.cached():
+ # In non-strict mode, strong cache key becomes available when
+ # the artifact is cached
+ self.__update_cache_key_non_strict()
+
self._update_ready_for_runtime_and_cached()
- # _pull():
+ self.__schedule_assembly_when_necessary()
+
+ if self.__can_query_cache_callback is not None:
+ self.__can_query_cache_callback(self)
+ self.__can_query_cache_callback = None
+
+ # _load_artifact():
#
- # Pull artifact from remote artifact repository into local artifact cache.
+ # Load artifact from cache or pull it from remote artifact repository.
#
# Returns: True if the artifact has been downloaded, False otherwise
#
- def _pull(self):
+ def _load_artifact(self, *, pull, strict=None):
context = self._get_context()
- # Get optional specific subdir to pull and optional list to not pull
- # based off of user context
- pull_buildtrees = context.pull_buildtrees
+ if strict is None:
+ strict = context.get_strict()
- # Attempt to pull artifact without knowing whether it's available
- strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
- if strict_artifact.pull(pull_buildtrees=pull_buildtrees):
- # Notify successful download
- return True
+ pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
- if not context.get_strict() and not self._cached():
- # In non-strict mode also try pulling weak artifact
- # if no weak artifact is cached yet.
- artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
- return artifact.pull(pull_buildtrees=pull_buildtrees)
- else:
- # No artifact has been downloaded
+ # First check whether we already have the strict artifact in the local cache
+ artifact = Artifact(
+ self,
+ context,
+ strict_key=self.__strict_cache_key,
+ strong_key=self.__strict_cache_key,
+ weak_key=self.__weak_cache_key,
+ )
+ artifact.query_cache()
+
+ self.__pull_pending = False
+ if not pull and not artifact.cached(buildtree=pull_buildtrees):
+ if self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace():
+ # Artifact is not completely available in cache and artifact remote server is available.
+ # Stop artifact loading here as pull is required to proceed.
+ self.__pull_pending = True
+
+ # Attempt to pull artifact with the strict cache key
+ pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+ if artifact.cached() or strict:
+ self.__artifact = artifact
+ return pulled
+ elif self.__pull_pending:
return False
+ # In non-strict mode retry with weak cache key
+ artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
+ artifact.query_cache()
+
+ # Attempt to pull artifact with the weak cache key
+ pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+ self.__artifact = artifact
+ return pulled
+
+ def _query_source_cache(self):
+ self.__sources.query_cache()
+
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
return True
@@ -2378,7 +2394,7 @@
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
if not self.__ready_for_runtime_and_cached:
- if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
+ if self.__runtime_deps_uncached == 0 and self.__artifact and self.__cache_key and self._cached_success():
self.__ready_for_runtime_and_cached = True
# Notify reverse dependencies
@@ -2595,6 +2611,7 @@
return None
artifact = Artifact(self, self._get_context(), strong_key=workspace.last_build)
+ artifact.query_cache()
if not artifact.cached():
return None
@@ -2862,13 +2879,33 @@
self.__variables.expand(sandbox_config)
self.__sandbox_config = SandboxConfig.new_from_node(sandbox_config, platform=context.platform)
- # __initialize_from_artifact()
+ # __initialize_from_artifact_key()
#
- # Initialize the element state from an Artifact object
+ # Initialize the element state from an artifact key
#
- def __initialize_from_artifact(self, artifact: Artifact):
- self.__artifact = artifact
- self._mimic_artifact()
+ def __initialize_from_artifact_key(self, key: str):
+ # At this point we only know the key which was specified on the command line,
+ # so we will pretend all keys are equal.
+ #
+ # If the artifact is cached, then the real keys will be loaded from the
+ # artifact in `_load_artifact()` and `_load_artifact_done()`.
+ #
+ self.__cache_key = key
+ self.__strict_cache_key = key
+ self.__weak_cache_key = key
+
+ self._initialize_state()
+
+ # ArtifactElement requires access to the artifact early on to walk
+ # dependencies.
+ self._load_artifact(pull=False)
+
+ if not self._cached():
+ # Remotes are not initialized when artifact elements are loaded.
+ # Always consider pull pending if the artifact is not cached.
+ self.__pull_pending = True
+ else:
+ self._load_artifact_done()
@classmethod
def __compose_default_splits(cls, project, defaults, first_pass):
@@ -3244,59 +3281,10 @@
# In strict mode, the strong cache key always matches the strict cache key
self.__cache_key = self.__strict_cache_key
- # If we've newly calculated a cache key, our artifact's
- # current state will also change - after all, we can now find
- # a potential existing artifact.
- self.__update_artifact_state()
-
# Update the message kwargs in use for this plugin to dispatch messages with
#
self._message_kwargs["element_key"] = self._get_display_key()
- # __update_artifact_state()
- #
- # Updates the data involved in knowing about the artifact corresponding
- # to this element.
- #
- # If the state changes, this will subsequently call
- # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
- # possible.
- #
- # Element.__update_cache_keys() must be called before this to have
- # meaningful results, because the element must know its cache key before
- # it can check whether an artifact exists for that cache key.
- #
- def __update_artifact_state(self):
- assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
- assert self.__artifact is None
-
- context = self._get_context()
-
- strict_artifact = Artifact(
- self,
- context,
- strong_key=self.__strict_cache_key,
- strict_key=self.__strict_cache_key,
- weak_key=self.__weak_cache_key,
- )
- if context.get_strict() or strict_artifact.cached():
- self.__artifact = strict_artifact
- else:
- self.__artifact = Artifact(
- self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
- )
-
- if not context.get_strict() and self.__artifact.cached():
- # In non-strict mode, strong cache key becomes available when
- # the artifact is cached
- self.__update_cache_key_non_strict()
-
- self.__schedule_assembly_when_necessary()
-
- if self.__can_query_cache_callback is not None:
- self.__can_query_cache_callback(self)
- self.__can_query_cache_callback = None
-
# __update_cache_key_non_strict()
#
# Calculates the strong cache key if it hasn't already been set.
@@ -3305,7 +3293,7 @@
# strict cache key, so no work needs to be done.
#
# When buildstream is not run in strict mode, this requires the artifact
- # state (as set in Element.__update_artifact_state()) to be set accordingly,
+ # state (as set in Element._load_artifact()) to be set accordingly,
# as the cache key can be loaded from the cache (possibly pulling from
# a remote cache).
#
diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py
index 095a5f6..927954d 100644
--- a/src/buildstream/testing/_fixtures.py
+++ b/src/buildstream/testing/_fixtures.py
@@ -21,8 +21,6 @@
import psutil
import pytest
-from buildstream import node, DownloadableFileSource
-
# Number of seconds to wait for background threads to exit.
_AWAIT_THREADS_TIMEOUT_SECONDS = 5
@@ -54,10 +52,3 @@
assert has_no_unexpected_background_threads(default_thread_number)
yield
assert has_no_unexpected_background_threads(default_thread_number)
-
-
-# Reset global state in node.pyx to improve test isolation
-@pytest.fixture(autouse=True)
-def reset_global_node_state():
- node._reset_global_state()
- DownloadableFileSource._reset_url_opener()
diff --git a/src/buildstream/testing/_sourcetests/conftest.py b/src/buildstream/testing/_sourcetests/conftest.py
index 6790712..fead43a 100644
--- a/src/buildstream/testing/_sourcetests/conftest.py
+++ b/src/buildstream/testing/_sourcetests/conftest.py
@@ -14,4 +14,4 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-from .._fixtures import reset_global_node_state, default_thread_number, thread_check # pylint: disable=unused-import
+from .._fixtures import default_thread_number, thread_check # pylint: disable=unused-import
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 858065d..24a1ee1 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -6,7 +6,6 @@
import pytest
from buildstream import _yaml
-from buildstream.types import _Scope
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
@@ -33,15 +32,12 @@
# Create a local artifact cache handle
artifactcache = context.artifactcache
- # Ensure the element's artifact memeber is initialised
- # This is duplicated from Pipeline.resolve_elements()
- # as this test does not use the cli frontend.
- for e in element._dependencies(_Scope.ALL):
- e._initialize_state()
-
# Initialize remotes
context.initialize_remotes(True, True, None, None)
+ # Query local cache
+ element._load_artifact(pull=False)
+
assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst"
assert element._push(), "Push operation failed"
diff --git a/tests/conftest.py b/tests/conftest.py
index d79ad40..e3b13c9 100755
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -27,7 +27,6 @@
from buildstream.testing import register_repo_kind, sourcetests_collection_hook
from buildstream.testing._fixtures import ( # pylint: disable=unused-import
default_thread_number,
- reset_global_node_state,
thread_check,
)
from buildstream.testing.integration import integration_cache # pylint: disable=unused-import
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64..6c8a4b7 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@
def test_fetch_consistency_error(cli, datafiles):
project = str(datafiles)
- # When the error occurs outside of the scheduler at load time,
- # then the SourceError is reported directly as the main error.
result = cli.run(project=project, args=["source", "fetch", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@
project = str(datafiles)
result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index a32c747..8919330 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -372,8 +372,8 @@
#
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- assert result.get_pulled_elements() == ["target.bst"]
- assert result.get_pushed_elements() == []
+ assert "target.bst" in result.get_pulled_elements()
+ assert "target.bst" not in result.get_pushed_elements()
# Delete the artifact locally again.
cli.remove_artifact_from_cache(project, "target.bst")
@@ -385,8 +385,8 @@
)
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- assert result.get_pulled_elements() == ["target.bst"]
- assert result.get_pushed_elements() == ["target.bst"]
+ assert "target.bst" in result.get_pulled_elements()
+ assert "target.bst" in result.get_pushed_elements()
# Ensure that when an artifact's size exceeds available disk space
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index 3dd686d..950cd83 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@
def test_track_consistency_error(cli, datafiles):
project = str(datafiles)
- # Track the element causing a consistency error
+ # Track the element causing a consistency error in `is_cached()`
result = cli.run(project=project, args=["source", "track", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
def test_track_consistency_bug(cli, datafiles):
project = str(datafiles)
- # Track the element causing an unhandled exception
+ # Track the element causing an unhandled exception in `is_cached()`
result = cli.run(project=project, args=["source", "track", "bug.bst"])
- # We expect BuildStream to fail gracefully, with no recorded exception.
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 86dac0b..26f808d 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -74,7 +74,7 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -114,9 +114,9 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
# check that we have the source in the cas now and it's not fetched
+ element._query_source_cache()
assert element._cached_sources()
assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
@@ -134,7 +134,7 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -151,8 +151,11 @@
assert ("SUCCESS Fetching from {}".format(repo.source_config(ref=ref)["url"])) in res.stderr
# Check that the source in both in the source dir and the local CAS
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert element._cached_sources()
@@ -168,7 +171,7 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -200,7 +203,7 @@
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index edbcfdf..47e845b 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -86,7 +86,7 @@
project.ensure_fully_loaded()
element = project.load_elements(["push.bst"])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -136,7 +136,7 @@
project.ensure_fully_loaded()
element = project.load_elements(["push.bst"])[0]
- element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f058..6671c79 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -64,7 +64,7 @@
# now check that the source is in the refs file, this is pretty messy but
# seems to be the only way to get the sources?
element = project.load_elements(["import-bin.bst"])[0]
- element._initialize_state()
+ element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
assert sourcecache.contains(source)
@@ -99,7 +99,7 @@
sourcecache = context.sourcecache
element = project.load_elements(["import-dev.bst"])[0]
- element._initialize_state()
+ element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
@@ -133,9 +133,9 @@
project.ensure_fully_loaded()
element = project.load_elements(["import-dev.bst"])[0]
- element._initialize_state()
# check consistency of the source
+ element._query_source_cache()
assert not element._cached_sources()
res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d8..861e70c 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- # Track will encounter an inconsistent submodule without any ref
+ # Track to update to the offending commit
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
+ # Fetch after track will encounter an inconsistent submodule without any ref
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+ result.assert_success()
+
# Assert that we are just fine without it, and emit a warning to the user.
assert "Ignoring inconsistent submodule" in result.stderr