blob: ab8262bc8426442717b8e15267ce384f8b2c379e [file] [log] [blame]
import hashlib
import multiprocessing
import os
import signal
import pytest
from buildstream import _yaml, _signals, utils
from buildstream._context import Context
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from tests.testutils import cli, create_artifact_share
# Project directory
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"project",
)
# Handle messages from the pipeline
def message_handler(message, context):
pass
# Since parent processes wait for queue events, we need
# to put something on it if the called process raises an
# exception.
def _queue_wrapper(target, queue, *args):
try:
target(*args, queue=queue)
except Exception as e:
queue.put(str(e))
raise
def tree_maker(cas, tree, directory):
if tree.root.ByteSize() == 0:
tree.root.CopyFrom(directory)
for directory_node in directory.directories:
child_directory = tree.children.add()
with open(cas.objpath(directory_node.digest), 'rb') as f:
child_directory.ParseFromString(f.read())
tree_maker(cas, tree, child_directory)
@pytest.mark.datafiles(DATA_DIR)
def test_pull(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Configure artifact share
artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
user_config_file = str(tmpdir.join('buildstream.conf'))
user_config = {
'scheduler': {
'pushers': 1
},
'artifacts': {
'url': share.repo,
'push': True,
}
}
# Write down the user configuration file
_yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
# Ensure CLI calls will use it
cli.configure(user_config)
# First build the project with the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Assert that we shared/pushed the cached artifact
element_key = cli.get_element_key(project_dir, 'target.bst')
assert share.has_artifact('test', 'target.bst', element_key)
# Delete the artifact locally
cli.remove_artifact_from_cache(project_dir, 'target.bst')
# Assert that we are not cached locally anymore
assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.set_message_handler(message_handler)
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = context.artifactcache
# Assert that the element's artifact is **not** cached
element = project.load_elements(['target.bst'])[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert not cas.contains(element, element_key)
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_pull, queue, user_config_file, project_dir,
artifact_dir, 'target.bst', element_key))
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
process.start()
error = queue.get()
process.join()
except KeyboardInterrupt:
utils._kill_process_tree(process.pid)
raise
assert not error
assert cas.contains(element, element_key)
def _test_pull(user_config_file, project_dir, artifact_dir,
element_name, element_key, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
project.ensure_fully_loaded()
# Create a local CAS cache handle
cas = context.artifactcache
# Load the target element
element = project.load_elements([element_name])[0]
# Manually setup the CAS remote
remotes = cas.get_remotes_from_projects()
cas.setup_remotes(remotes=remotes)
if cas.has_push_remotes(element=element):
# Push the element's artifact
if not cas.pull(element, element_key):
queue.put("Pull operation failed")
else:
queue.put(None)
else:
queue.put("No remote configured for element {}".format(element_name))
@pytest.mark.datafiles(DATA_DIR)
def test_pull_tree(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Configure artifact share
artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
user_config_file = str(tmpdir.join('buildstream.conf'))
user_config = {
'scheduler': {
'pushers': 1
},
'artifacts': {
'url': share.repo,
'push': True,
}
}
# Write down the user configuration file
_yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
# Ensure CLI calls will use it
cli.configure(user_config)
# First build the project with the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Assert that we shared/pushed the cached artifact
element_key = cli.get_element_key(project_dir, 'target.bst')
assert share.has_artifact('test', 'target.bst', element_key)
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
context.set_message_handler(message_handler)
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
artifactcache = context.artifactcache
cas = artifactcache.cas
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'])[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert artifactcache.contains(element, element_key)
# Retrieve the Directory object from the cached artifact
artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
artifact_digest = cas.resolve_ref(artifact_ref)
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push_tree, queue, user_config_file, project_dir,
artifact_dir, artifact_digest))
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
process.start()
tree_hash, tree_size = queue.get()
process.join()
except KeyboardInterrupt:
utils._kill_process_tree(process.pid)
raise
assert tree_hash and tree_size
# Now delete the artifact locally
cli.remove_artifact_from_cache(project_dir, 'target.bst')
# Assert that we are not cached locally anymore
assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
size_bytes=tree_size)
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_pull_tree, queue, user_config_file, project_dir,
artifact_dir, tree_digest))
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
process.start()
directory_hash, directory_size = queue.get()
process.join()
except KeyboardInterrupt:
utils._kill_process_tree(process.pid)
raise
assert directory_hash and directory_size
directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
size_bytes=directory_size)
# Ensure the entire Tree stucture has been pulled
assert os.path.exists(cas.objpath(directory_digest))
def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
project.ensure_fully_loaded()
# Create a local CAS cache handle
artifactcache = context.artifactcache
cas = artifactcache.cas
# Manually setup the CAS remote
remotes = artifactcache.get_remotes_from_projects()
artifactcache.setup_remotes(remotes=remotes)
if artifactcache.has_push_remotes():
directory = remote_execution_pb2.Directory()
with open(cas.objpath(artifact_digest), 'rb') as f:
directory.ParseFromString(f.read())
# Build the Tree object while we are still cached
tree = remote_execution_pb2.Tree()
tree_maker(cas, tree, directory)
# Push the Tree as a regular message
tree_digest = artifactcache.push_message(project, tree)
queue.put((tree_digest.hash, tree_digest.size_bytes))
else:
queue.put("No remote configured")
def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
project.ensure_fully_loaded()
# Create a local CAS cache handle
cas = context.artifactcache
# Manually setup the CAS remote
remotes = cas.get_remotes_from_projects()
cas.setup_remotes(remotes=remotes)
if cas.has_push_remotes():
# Pull the artifact using the Tree object
directory_digest = cas.pull_tree(project, artifact_digest)
queue.put((directory_digest.hash, directory_digest.size_bytes))
else:
queue.put("No remote configured")