Merge pull request #1458 from apache/juerg/build-with-source-push
Query sources of cached elements in build pipeline with source push enabled
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 925ded0..ecff02c 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..jobs import JobStatus
from ..._exceptions import SkipJob
@@ -42,6 +43,10 @@
return QueueStatus.SKIP
def done(self, _, element, result, status):
+
+ if status is JobStatus.FAIL:
+ return
+
element._load_artifact_done()
@staticmethod
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2645171..8ff60df 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -195,9 +195,13 @@
#
# Args:
# elements (list of Element): The elements to check
- # sources (bool): True to only query the source cache
+ # sources_of_cached_elements (bool): True to query the source cache for elements with a cached artifact
+ # only_sources (bool): True to only query the source cache
#
- def query_cache(self, elements, *, sources=False):
+ def query_cache(self, elements, *, sources_of_cached_elements=False, only_sources=False):
+ # It doesn't make sense to combine these flags
+ assert not sources_of_cached_elements or not only_sources
+
with self._context.messenger.timed_activity("Query cache", silent_nested=True):
# Enqueue complete build plan as this is required to determine `buildable` status.
plan = list(_pipeline.dependencies(elements, _Scope.ALL))
@@ -208,9 +212,9 @@
# This is the case for artifact elements, which load the
# artifact early on.
pass
- elif not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+ elif not only_sources and element._get_cache_key(strength=_KeyStrength.WEAK):
element._load_artifact(pull=False)
- if not element._can_query_cache() or not element._cached_success():
+ if sources_of_cached_elements or not element._can_query_cache() or not element._cached_success():
element._query_source_cache()
if not element._pull_pending():
element._load_artifact_done()
@@ -322,7 +326,7 @@
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
- self.query_cache([element], sources=True)
+ self.query_cache([element], only_sources=True)
self._fetch([element])
_pipeline.assert_sources_cached(self._context, [element])
@@ -385,7 +389,11 @@
for element in self.targets:
element._set_artifact_files_required(scope=scope)
- self.query_cache(elements)
+ source_push_enabled = self._sourcecache.has_push_remotes()
+
+ # If source push is enabled, the source cache status of all elements
+ # is required, independent of whether the artifact is already available.
+ self.query_cache(elements, sources_of_cached_elements=source_push_enabled)
# Now construct the queues
#
@@ -401,7 +409,7 @@
if self._artifacts.has_push_remotes():
self._add_queue(ArtifactPushQueue(self._scheduler, skip_uncached=True))
- if self._sourcecache.has_push_remotes():
+ if source_push_enabled:
self._add_queue(SourcePushQueue(self._scheduler))
# Enqueue elements
@@ -438,7 +446,7 @@
ignore_project_source_remotes=ignore_project_source_remotes,
)
- self.query_cache(elements, sources=True)
+ self.query_cache(elements, only_sources=True)
# Delegated to a shared fetch method
self._fetch(elements, announce_session=True)
@@ -512,7 +520,7 @@
ignore_project_source_remotes=ignore_project_source_remotes,
)
- self.query_cache(elements, sources=True)
+ self.query_cache(elements, only_sources=True)
if not self._sourcecache.has_push_remotes():
raise StreamError("No source caches available for pushing sources")
@@ -897,7 +905,7 @@
)
# Assert all sources are cached in the source dir
- self.query_cache(elements, sources=True)
+ self.query_cache(elements, only_sources=True)
self._fetch(elements)
_pipeline.assert_sources_cached(self._context, elements)
@@ -948,7 +956,7 @@
# If we're going to checkout, we need at least a fetch,
#
if not no_checkout:
- self.query_cache(elements, sources=True)
+ self.query_cache(elements, only_sources=True)
self._fetch(elements, fetch_original=True)
expanded_directories = []
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 0cfa812..8a484a2 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1596,9 +1596,7 @@
def __should_schedule(self):
# We're processing if we're already scheduled, we've
# finished assembling or if we're waiting to pull.
- processing = (
- self.__assemble_scheduled or self.__assemble_done or (self._can_query_cache() and self._pull_pending())
- )
+ processing = self.__assemble_scheduled or self.__assemble_done or self._pull_pending()
# We should schedule a build when
return (
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 47e845b..7814a7b 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -287,3 +287,35 @@
res.assert_success()
assert "fetch:{}".format(element_name) in res.stderr
assert "Pushed source" in res.stderr
+
+
+# Regression test for https://github.com/apache/buildstream/issues/1456
+# Test that a build pipeline with source push enabled doesn't fail if an
+# element is already cached.
+@pytest.mark.datafiles(DATA_DIR)
+def test_build_push_source_twice(cli, tmpdir, datafiles):
+ cache_dir = os.path.join(str(tmpdir), "cache")
+ project_dir = str(datafiles)
+ element_name = "import-bin.bst"
+
+ with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share:
+ user_config_file = str(tmpdir.join("buildstream.conf"))
+ user_config = {
+ "scheduler": {"pushers": 1},
+ "source-caches": {"servers": [{"url": share.repo, "push": True,}]},
+ "cachedir": cache_dir,
+ }
+ _yaml.roundtrip_dump(user_config, file=user_config_file)
+ cli.configure(user_config)
+
+ res = cli.run(project=project_dir, args=["build", element_name])
+ res.assert_success()
+ assert "fetch:{}".format(element_name) in res.stderr
+ assert "Pushed source" in res.stderr
+
+ # The second build pipeline is a no-op as everything is already cached.
+ # However, this verifies that the pipeline behaves as expected.
+ res = cli.run(project=project_dir, args=["build", element_name])
+ res.assert_success()
+ assert "fetch:{}".format(element_name) not in res.stderr
+ assert "Pushed source" not in res.stderr