blob: ed662dcd29911ffee271b90bdd1ff060646c869e [file] [log] [blame]
# Pylint doesn't play well with fixtures and dependency injection from pytest
# pylint: disable=redefined-outer-name
import os
from shutil import copyfile
import subprocess
import tarfile
import tempfile
import urllib.parse
import pytest
from buildstream import utils
from buildstream.exceptions import ErrorDomain
from buildstream.testing import generate_project, generate_element
from buildstream.testing import cli # pylint: disable=unused-import
from buildstream.testing._utils.site import HAVE_LZIP
from tests.testutils.file_server import create_file_server
from . import list_dir_contents
DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "tar",)
def _assemble_tar(workingdir, srcdir, dstfile):
old_dir = os.getcwd()
os.chdir(workingdir)
with tarfile.open(dstfile, "w:gz") as tar:
tar.add(srcdir)
os.chdir(old_dir)
def _assemble_tar_lz(workingdir, srcdir, dstfile):
old_dir = os.getcwd()
os.chdir(workingdir)
with tempfile.TemporaryFile() as uncompressed:
with tarfile.open(fileobj=uncompressed, mode="w:") as tar:
tar.add(srcdir)
uncompressed.seek(0, 0)
with open(dstfile, "wb") as dst:
subprocess.call(["lzip"], stdin=uncompressed, stdout=dst)
os.chdir(old_dir)
# Test that without ref, consistency is set appropriately.
@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-ref"))
def test_no_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
assert cli.get_element_state(project, "target.bst") == "no reference"
# Test that when I fetch a nonexistent URL, errors are handled gracefully and a retry is performed.
@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_fetch_bad_url(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
# Try to fetch it
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
assert "FAILURE Try #" in result.stderr
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that when I fetch with an invalid ref, it fails.
@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_fetch_bad_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
# Create a local tar
src_tar = os.path.join(str(tmpdir), "a.tar.gz")
_assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
# Try to fetch it
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that when tracking with a ref set, there is a warning
@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_track_warning(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
# Create a local tar
src_tar = os.path.join(str(tmpdir), "a.tar.gz")
_assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
# Track it
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
assert "Potential man-in-the-middle attack!" in result.stderr
# Test that a staged checkout matches what was tarred up, with the default first subdir
@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
@pytest.mark.parametrize("srcdir", ["a", "./a"])
def test_stage_default_basedir(cli, tmpdir, datafiles, srcdir):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create a local tar
src_tar = os.path.join(str(tmpdir), "a.tar.gz")
_assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar)
# Track, fetch, build, checkout
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
original_dir = os.path.join(str(datafiles), "content", "a")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
# Test that a staged checkout matches what was tarred up, with an empty base-dir
@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-basedir"))
@pytest.mark.parametrize("srcdir", ["a", "./a"])
def test_stage_no_basedir(cli, tmpdir, datafiles, srcdir):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create a local tar
src_tar = os.path.join(str(tmpdir), "a.tar.gz")
_assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar)
# Track, fetch, build, checkout
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the full content of the tarball is checked out (base-dir: '')
original_dir = os.path.join(str(datafiles), "content")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
# Test that a staged checkout matches what was tarred up, with an explicit basedir
@pytest.mark.datafiles(os.path.join(DATA_DIR, "explicit-basedir"))
@pytest.mark.parametrize("srcdir", ["a", "./a"])
def test_stage_explicit_basedir(cli, tmpdir, datafiles, srcdir):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create a local tar
src_tar = os.path.join(str(tmpdir), "a.tar.gz")
_assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar)
# Track, fetch, build, checkout
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
original_dir = os.path.join(str(datafiles), "content", "a")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
# Test that we succeed to extract tarballs with hardlinks when stripping the
# leading paths
@pytest.mark.datafiles(os.path.join(DATA_DIR, "contains-links"))
def test_stage_contains_links(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create a local tar
src_tar = os.path.join(str(tmpdir), "a.tar.gz")
# Create a hardlink, we wont trust git to store that info for us
os.makedirs(os.path.join(str(datafiles), "content", "base-directory", "subdir2"), exist_ok=True)
file1 = os.path.join(str(datafiles), "content", "base-directory", "subdir1", "file.txt")
file2 = os.path.join(str(datafiles), "content", "base-directory", "subdir2", "file.txt")
os.link(file1, file2)
_assemble_tar(os.path.join(str(datafiles), "content"), "base-directory", src_tar)
# Track, fetch, build, checkout
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
original_dir = os.path.join(str(datafiles), "content", "base-directory")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
@pytest.mark.skipif(not HAVE_LZIP, reason="lzip is not available")
@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
@pytest.mark.parametrize("srcdir", ["a", "./a"])
def test_stage_default_basedir_lzip(cli, tmpdir, datafiles, srcdir):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create a local tar
src_tar = os.path.join(str(tmpdir), "a.tar.lz")
_assemble_tar_lz(os.path.join(str(datafiles), "content"), srcdir, src_tar)
# Track, fetch, build, checkout
result = cli.run(project=project, args=["source", "track", "target-lz.bst"])
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "target-lz.bst"])
result.assert_success()
result = cli.run(project=project, args=["build", "target-lz.bst"])
result.assert_success()
result = cli.run(project=project, args=["artifact", "checkout", "target-lz.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
original_dir = os.path.join(str(datafiles), "content", "a")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
# Test that tarballs with read-only files work
# a - contains read-only files in a writable directory
# b - root directory has read-only permission
# c - contains one file that has no read nor write permissions. Base-dir set to '' to extract root of tarball
@pytest.mark.datafiles(os.path.join(DATA_DIR, "read-only"))
@pytest.mark.parametrize("tar_name, base_dir", [("a", "*"), ("b", "*"), ("c", "")])
def test_read_only_dir(cli, tmpdir, datafiles, tar_name, base_dir):
try:
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
tar_file = "{}.tar.gz".format(tar_name)
generate_element(
project,
"target.bst",
{
"kind": "import",
"sources": [{"kind": "tar", "url": "tmpdir:/{}".format(tar_file), "ref": "foo", "base-dir": base_dir}],
},
)
# Get the tarball in tests/sources/tar/read-only/content
#
# NOTE that we need to do this because tarfile.open and tar.add()
# are packing the tar up with writeable files and dirs
tarball = os.path.join(str(datafiles), "content", tar_file)
if not os.path.exists(tarball):
raise FileNotFoundError("{} does not exist".format(tarball))
copyfile(tarball, os.path.join(str(tmpdir), tar_file))
# Because this test can potentially leave directories behind
# which are difficult to remove, ask buildstream to use
# our temp directory, so we can clean up.
tmpdir_str = str(tmpdir)
if not tmpdir_str.endswith(os.path.sep):
tmpdir_str += os.path.sep
env = {"TMP": tmpdir_str}
# Track, fetch, build, checkout
result = cli.run(project=project, args=["source", "track", "target.bst"], env=env)
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "target.bst"], env=env)
result.assert_success()
result = cli.run(project=project, args=["build", "target.bst"], env=env)
result.assert_success()
finally:
utils._force_rmtree(str(tmpdir))
@pytest.mark.parametrize("server_type", ("FTP", "HTTP"))
@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_use_netrc(cli, datafiles, server_type, tmpdir):
file_server_files = os.path.join(str(tmpdir), "file_server")
fake_home = os.path.join(str(tmpdir), "fake_home")
os.makedirs(file_server_files, exist_ok=True)
os.makedirs(fake_home, exist_ok=True)
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
os.environ["HOME"] = fake_home
with open(os.path.join(fake_home, ".netrc"), "wb") as f:
os.fchmod(f.fileno(), 0o700)
f.write(b"machine 127.0.0.1\n")
f.write(b"login testuser\n")
f.write(b"password 12345\n")
with create_file_server(server_type) as server:
server.add_user("testuser", "12345", file_server_files)
generate_project(project, config={"aliases": {"tmpdir": server.base_url()}})
src_tar = os.path.join(file_server_files, "a.tar.gz")
_assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
server.start()
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
original_dir = os.path.join(str(datafiles), "content", "a")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
@pytest.mark.parametrize("server_type", ("FTP", "HTTP"))
@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_netrc_already_specified_user(cli, datafiles, server_type, tmpdir):
file_server_files = os.path.join(str(tmpdir), "file_server")
fake_home = os.path.join(str(tmpdir), "fake_home")
os.makedirs(file_server_files, exist_ok=True)
os.makedirs(fake_home, exist_ok=True)
project = str(datafiles)
os.environ["HOME"] = fake_home
with open(os.path.join(fake_home, ".netrc"), "wb") as f:
os.fchmod(f.fileno(), 0o700)
f.write(b"machine 127.0.0.1\n")
f.write(b"login testuser\n")
f.write(b"password 12345\n")
with create_file_server(server_type) as server:
server.add_user("otheruser", "12345", file_server_files)
parts = urllib.parse.urlsplit(server.base_url())
base_url = urllib.parse.urlunsplit([parts[0], "otheruser@{}".format(parts[1]), *parts[2:]])
generate_project(project, config={"aliases": {"tmpdir": base_url}})
src_tar = os.path.join(file_server_files, "a.tar.gz")
_assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
server.start()
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that BuildStream doesnt crash if HOME is unset while
# the netrc module is trying to find it's ~/.netrc file.
@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_homeless_environment(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
# Create a local tar
src_tar = os.path.join(str(tmpdir), "a.tar.gz")
_assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
# Use a track, make sure the plugin tries to find a ~/.netrc
result = cli.run(project=project, args=["source", "track", "target.bst"], env={"HOME": None})
result.assert_success()
@pytest.mark.datafiles(os.path.join(DATA_DIR, "out-of-basedir-hardlinks"))
def test_out_of_basedir_hardlinks(cli, tmpdir, datafiles):
def ensure_link(member):
# By default, python will simply duplicate files - we want
# hardlinks!
if member.path == "contents/to_extract/a":
member.type = tarfile.LNKTYPE
member.linkname = "contents/elsewhere/a"
return member
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create a tarball with an odd hardlink
src_tar = os.path.join(str(tmpdir), "contents.tar.gz")
old_dir = os.getcwd()
os.chdir(str(tmpdir))
with tarfile.open(src_tar, "w:gz") as tar:
# Don't recursively add `contents` as the order is not guaranteed.
# We need to add `elsewhere` before `to_extract` as the latter
# references the former in `linkname`.
tar.add("contents", recursive=False)
tar.add("contents/elsewhere")
tar.add("contents/to_extract", filter=ensure_link)
os.chdir(old_dir)
# Make sure our tarfile is actually created with the desired
# attributes set
with tarfile.open(src_tar, "r:gz") as tar:
assert any(
member.islnk() and member.path == "contents/to_extract/a" and member.linkname == "contents/elsewhere/a"
for member in tar.getmembers()
)
# Assert that we will actually create a singular copy of the file
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
original_dir = os.path.join(str(datafiles), "contents", "to_extract")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
@pytest.mark.datafiles(os.path.join(DATA_DIR, "out-of-basedir-hardlinks"))
def test_malicious_out_of_basedir_hardlinks(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}})
# Create a maliciously-hardlinked tarball
def ensure_link(member):
# By default, python will simply duplicate files - we want
# hardlinks!
if member.path == "contents/elsewhere/malicious":
member.type = tarfile.LNKTYPE
# This should not be allowed
member.linkname = "../../../malicious_target.bst"
return member
src_tar = os.path.join(str(tmpdir), "contents.tar.gz")
old_dir = os.getcwd()
os.chdir(str(tmpdir))
with tarfile.open(src_tar, "w:gz") as tar:
tar.add("contents", filter=ensure_link)
os.chdir(old_dir)
# Make sure our tarfile is actually created with the desired
# attributes set
with tarfile.open(src_tar, "r:gz") as tar:
assert any(
member.islnk()
and member.path == "contents/elsewhere/malicious"
and member.linkname == "../../../malicious_target.bst"
for member in tar.getmembers()
)
# Try to execute the exploit
result = cli.run(project=project, args=["source", "track", "malicious_target.bst"])
result.assert_success()
result = cli.run(project=project, args=["source", "fetch", "malicious_target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)