| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # Authors: |
| # Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> |
| # |
| # Pylint doesn't play well with fixtures and dependency injection from pytest |
| # pylint: disable=redefined-outer-name |
| from contextlib import contextmanager |
| import os |
| import shutil |
| import pytest |
| |
| from buildstream.exceptions import ErrorDomain |
| from buildstream._project import Project |
| from buildstream import _yaml |
| from buildstream._testing import cli # pylint: disable=unused-import |
| from buildstream._testing import create_repo |
| |
| from tests.testutils import create_artifact_share, dummy_context |
| |
| DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project") |
| |
| |
| def create_test_element(tmpdir, project_dir): |
| repo = create_repo("tar", str(tmpdir)) |
| ref = repo.create(os.path.join(project_dir, "files")) |
| element_path = os.path.join(project_dir, "elements") |
| element_name = "fetch.bst" |
| element = {"kind": "import", "sources": [repo.source_config(ref=ref)]} |
| _yaml.roundtrip_dump(element, os.path.join(element_path, element_name)) |
| |
| return element_name, repo, ref |
| |
| |
| @contextmanager |
| def context_with_source_cache(cli, cache, share, tmpdir): |
| user_config_file = str(tmpdir.join("buildstream.conf")) |
| user_config = { |
| "scheduler": {"pushers": 1}, |
| "source-caches": { |
| "servers": [ |
| { |
| "url": share.repo, |
| } |
| ] |
| }, |
| "cachedir": cache, |
| } |
| _yaml.roundtrip_dump(user_config, file=user_config_file) |
| cli.configure(user_config) |
| |
| with dummy_context(config=user_config_file) as context: |
| yield context |
| |
| |
| @pytest.mark.datafiles(DATA_DIR) |
| def test_source_fetch(cli, tmpdir, datafiles): |
| project_dir = str(datafiles) |
| element_name, _repo, _ref = create_test_element(tmpdir, project_dir) |
| cache_dir = os.path.join(str(tmpdir), "cache") |
| |
| # use artifact cache for sources for now, they should work the same |
| with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share: |
| with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: |
| project = Project(project_dir, context) |
| project.ensure_fully_loaded() |
| |
| element = project.load_elements([element_name])[0] |
| element._query_source_cache() |
| assert not element._cached_sources() |
| source = list(element.sources())[0] |
| |
| assert not share.get_source_proto(source._get_source_name()) |
| |
| # Just check that we sensibly fetch and build the element |
| res = cli.run(project=project_dir, args=["build", element_name]) |
| res.assert_success() |
| |
| assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "tar")) != [] |
| |
| # get root digest of source |
| sourcecache = context.sourcecache |
| digest = sourcecache.export(source)._get_digest() |
| |
| # Push the source to the remote |
| res = cli.run(project=project_dir, args=["source", "push", "--source-remote", share.repo, element_name]) |
| res.assert_success() |
| |
| # check the share has the proto and the object |
| assert share.get_source_proto(source._get_source_name()) |
| assert share.has_object(digest) |
| |
| # Delete the source locally |
| shutil.rmtree(os.path.join(str(cache_dir), "sources")) |
| shutil.rmtree(os.path.join(str(cache_dir), "cas")) |
| state = cli.get_element_state(project_dir, element_name) |
| assert state == "fetch needed" |
| |
| # Now fetch the source and check |
| res = cli.run(project=project_dir, args=["source", "fetch", element_name]) |
| res.assert_success() |
| assert "Pulled source" in res.stderr |
| |
| with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: |
| project = Project(project_dir, context) |
| project.ensure_fully_loaded() |
| |
| element = project.load_elements([element_name])[0] |
| |
| # check that we have the source in the cas now and it's not fetched |
| element._query_source_cache() |
| assert element._cached_sources() |
| assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "tar")) == [] |
| |
| |
| @pytest.mark.datafiles(DATA_DIR) |
| def test_fetch_fallback(cli, tmpdir, datafiles): |
| project_dir = str(datafiles) |
| element_name, repo, ref = create_test_element(tmpdir, project_dir) |
| cache_dir = os.path.join(str(tmpdir), "cache") |
| |
| # use artifact cache for sources for now, they should work the same |
| with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share: |
| with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: |
| project = Project(project_dir, context) |
| project.ensure_fully_loaded() |
| |
| element = project.load_elements([element_name])[0] |
| element._query_source_cache() |
| assert not element._cached_sources() |
| source = list(element.sources())[0] |
| |
| assert not share.get_source_proto(source._get_source_name()) |
| assert not os.path.exists(os.path.join(cache_dir, "sources")) |
| |
| # Now check if it falls back to the source fetch method. |
| res = cli.run(project=project_dir, args=["source", "fetch", element_name]) |
| res.assert_success() |
| brief_key = source._get_brief_display_key() |
| assert ( |
| "Remote source service ({}) does not have source {} cached".format(share.repo, brief_key) |
| ) in res.stderr |
| assert ("SUCCESS Fetching {}".format(repo.source_config(ref=ref)["url"])) in res.stderr |
| |
| # Check that the source in both in the source dir and the local CAS |
| project = Project(project_dir, context) |
| project.ensure_fully_loaded() |
| |
| element = project.load_elements([element_name])[0] |
| element._query_source_cache() |
| assert element._cached_sources() |
| |
| |
| @pytest.mark.datafiles(DATA_DIR) |
| def test_pull_fail(cli, tmpdir, datafiles): |
| project_dir = str(datafiles) |
| element_name, repo, _ref = create_test_element(tmpdir, project_dir) |
| cache_dir = os.path.join(str(tmpdir), "cache") |
| |
| with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share: |
| with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: |
| project = Project(project_dir, context) |
| project.ensure_fully_loaded() |
| |
| element = project.load_elements([element_name])[0] |
| element._query_source_cache() |
| assert not element._cached_sources() |
| source = list(element.sources())[0] |
| |
| # remove files and check that it doesn't build |
| shutil.rmtree(repo.repo) |
| |
| # Should fail in stream, with a plugin task causing the error |
| res = cli.run(project=project_dir, args=["build", element_name]) |
| res.assert_main_error(ErrorDomain.STREAM, None) |
| res.assert_task_error(ErrorDomain.SOURCE, None) |
| assert ( |
| "Remote source service ({}) does not have source {} cached".format( |
| share.repo, source._get_brief_display_key() |
| ) |
| in res.stderr |
| ) |
| |
| |
| @pytest.mark.datafiles(DATA_DIR) |
| def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles): |
| project_dir = str(datafiles) |
| element_name, repo, ref = create_test_element(tmpdir, project_dir) |
| cache_dir = os.path.join(str(tmpdir), "cache") |
| |
| # use artifact cache for sources for now, they should work the same |
| with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share: |
| with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: |
| project = Project(project_dir, context) |
| project.ensure_fully_loaded() |
| |
| element = project.load_elements([element_name])[0] |
| element._query_source_cache() |
| assert not element._cached_sources() |
| source = list(element.sources())[0] |
| |
| assert not share.get_artifact_proto(source._get_source_name()) |
| |
| # Just check that we sensibly fetch and build the element |
| res = cli.run(project=project_dir, args=["build", element_name]) |
| res.assert_success() |
| |
| assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "tar")) != [] |
| |
| # get root digest of source |
| sourcecache = context.sourcecache |
| digest = sourcecache.export(source)._get_digest() |
| |
| # Push the source to the remote |
| res = cli.run(project=project_dir, args=["source", "push", "--source-remote", share.repo, element_name]) |
| res.assert_success() |
| |
| # Remove the cas content, only keep the proto and such around |
| shutil.rmtree(os.path.join(str(tmpdir), "sourceshare", "repo", "cas", "objects")) |
| # check the share doesn't have the object |
| assert not share.has_object(digest) |
| |
| # Delete the source locally |
| shutil.rmtree(os.path.join(str(cache_dir), "sources")) |
| shutil.rmtree(os.path.join(str(cache_dir), "cas")) |
| state = cli.get_element_state(project_dir, element_name) |
| assert state == "fetch needed" |
| |
| # Now fetch the source and check |
| res = cli.run(project=project_dir, args=["source", "fetch", element_name]) |
| res.assert_success() |
| |
| assert ("SUCCESS Fetching {}".format(repo.source_config(ref=ref)["url"])) in res.stderr |