Merge pull request #1458 from apache/juerg/build-with-source-push

Query sources of cached elements in build pipeline with source push enabled
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 925ded0..ecff02c 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
+from ..jobs import JobStatus
 from ..._exceptions import SkipJob
 
 
@@ -42,6 +43,10 @@
             return QueueStatus.SKIP
 
     def done(self, _, element, result, status):
+
+        if status is JobStatus.FAIL:
+            return
+
         element._load_artifact_done()
 
     @staticmethod
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2645171..8ff60df 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -195,9 +195,13 @@
     #
     # Args:
     #    elements (list of Element): The elements to check
-    #    sources (bool): True to only query the source cache
+    #    sources_of_cached_elements (bool): True to query the source cache for elements with a cached artifact
+    #    only_sources (bool): True to only query the source cache
     #
-    def query_cache(self, elements, *, sources=False):
+    def query_cache(self, elements, *, sources_of_cached_elements=False, only_sources=False):
+        # It doesn't make sense to combine these flags
+        assert not sources_of_cached_elements or not only_sources
+
         with self._context.messenger.timed_activity("Query cache", silent_nested=True):
             # Enqueue complete build plan as this is required to determine `buildable` status.
             plan = list(_pipeline.dependencies(elements, _Scope.ALL))
@@ -208,9 +212,9 @@
                     # This is the case for artifact elements, which load the
                     # artifact early on.
                     pass
-                elif not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+                elif not only_sources and element._get_cache_key(strength=_KeyStrength.WEAK):
                     element._load_artifact(pull=False)
-                    if not element._can_query_cache() or not element._cached_success():
+                    if sources_of_cached_elements or not element._can_query_cache() or not element._cached_success():
                         element._query_source_cache()
                     if not element._pull_pending():
                         element._load_artifact_done()
@@ -322,7 +326,7 @@
 
         # Ensure we have our sources if we are launching a build shell
         if scope == _Scope.BUILD and not usebuildtree:
-            self.query_cache([element], sources=True)
+            self.query_cache([element], only_sources=True)
             self._fetch([element])
             _pipeline.assert_sources_cached(self._context, [element])
 
@@ -385,7 +389,11 @@
                 for element in self.targets:
                     element._set_artifact_files_required(scope=scope)
 
-        self.query_cache(elements)
+        source_push_enabled = self._sourcecache.has_push_remotes()
+
+        # If source push is enabled, the source cache status of all elements
+        # is required, independent of whether the artifact is already available.
+        self.query_cache(elements, sources_of_cached_elements=source_push_enabled)
 
         # Now construct the queues
         #
@@ -401,7 +409,7 @@
         if self._artifacts.has_push_remotes():
             self._add_queue(ArtifactPushQueue(self._scheduler, skip_uncached=True))
 
-        if self._sourcecache.has_push_remotes():
+        if source_push_enabled:
             self._add_queue(SourcePushQueue(self._scheduler))
 
         # Enqueue elements
@@ -438,7 +446,7 @@
             ignore_project_source_remotes=ignore_project_source_remotes,
         )
 
-        self.query_cache(elements, sources=True)
+        self.query_cache(elements, only_sources=True)
 
         # Delegated to a shared fetch method
         self._fetch(elements, announce_session=True)
@@ -512,7 +520,7 @@
             ignore_project_source_remotes=ignore_project_source_remotes,
         )
 
-        self.query_cache(elements, sources=True)
+        self.query_cache(elements, only_sources=True)
 
         if not self._sourcecache.has_push_remotes():
             raise StreamError("No source caches available for pushing sources")
@@ -897,7 +905,7 @@
         )
 
         # Assert all sources are cached in the source dir
-        self.query_cache(elements, sources=True)
+        self.query_cache(elements, only_sources=True)
         self._fetch(elements)
         _pipeline.assert_sources_cached(self._context, elements)
 
@@ -948,7 +956,7 @@
         # If we're going to checkout, we need at least a fetch,
         #
         if not no_checkout:
-            self.query_cache(elements, sources=True)
+            self.query_cache(elements, only_sources=True)
             self._fetch(elements, fetch_original=True)
 
         expanded_directories = []
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 0cfa812..8a484a2 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1596,9 +1596,7 @@
     def __should_schedule(self):
         # We're processing if we're already scheduled, we've
         # finished assembling or if we're waiting to pull.
-        processing = (
-            self.__assemble_scheduled or self.__assemble_done or (self._can_query_cache() and self._pull_pending())
-        )
+        processing = self.__assemble_scheduled or self.__assemble_done or self._pull_pending()
 
         # We should schedule a build when
         return (
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 47e845b..7814a7b 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -287,3 +287,35 @@
         res.assert_success()
         assert "fetch:{}".format(element_name) in res.stderr
         assert "Pushed source" in res.stderr
+
+
+# Regression test for https://github.com/apache/buildstream/issues/1456
+# Test that a build pipeline with source push enabled doesn't fail if an
+# element is already cached.
+@pytest.mark.datafiles(DATA_DIR)
+def test_build_push_source_twice(cli, tmpdir, datafiles):
+    cache_dir = os.path.join(str(tmpdir), "cache")
+    project_dir = str(datafiles)
+    element_name = "import-bin.bst"
+
+    with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share:
+        user_config_file = str(tmpdir.join("buildstream.conf"))
+        user_config = {
+            "scheduler": {"pushers": 1},
+            "source-caches": {"servers": [{"url": share.repo, "push": True,}]},
+            "cachedir": cache_dir,
+        }
+        _yaml.roundtrip_dump(user_config, file=user_config_file)
+        cli.configure(user_config)
+
+        res = cli.run(project=project_dir, args=["build", element_name])
+        res.assert_success()
+        assert "fetch:{}".format(element_name) in res.stderr
+        assert "Pushed source" in res.stderr
+
+        # The second build pipeline is a no-op as everything is already cached.
+        # However, this verifies that the pipeline behaves as expected.
+        res = cli.run(project=project_dir, args=["build", element_name])
+        res.assert_success()
+        assert "fetch:{}".format(element_name) not in res.stderr
+        assert "Pushed source" not in res.stderr