blob: fa44beab8755170c252910acb5c935688d50d5b4 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2017-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import multiprocessing
import os
import signal
import string
import tempfile
from .. import _ostree, _signals, utils
from .._exceptions import ArtifactError
from .._ostree import OSTreeError
from . import ArtifactCache
from .pushreceive import initialize_push_connection
from .pushreceive import push as push_artifact
from .pushreceive import PushException
def buildref(element, key):
project = element._get_project()
# Normalize ostree ref unsupported chars
valid_chars = string.digits + string.ascii_letters + '-._'
element_name = ''.join([
x if x in valid_chars else '_'
for x in element.normal_name
])
assert key is not None
# assume project and element names are not allowed to contain slashes
return '{0}/{1}/{2}'.format(project.name, element_name, key)
# Represents a single remote OSTree cache.
#
class _OSTreeRemote():
def __init__(self, spec, pull_url, push_url):
self.spec = spec
self.pull_url = pull_url
self.push_url = push_url
# Maps artifacts to the remotes that contain them.
#
class _OSTreeArtifactMap():
def __init__(self):
self._ref_to_remotes = {}
def append(self, ref, remote):
if ref in self._ref_to_remotes:
self._ref_to_remotes[ref].append(remote)
else:
self._ref_to_remotes[ref] = [remote]
def lookup(self, ref):
return self._ref_to_remotes.get(ref, [])
def lookup_first(self, ref):
return self._ref_to_remotes.get(ref, [])[0]
def contains(self, ref):
return ref in self._ref_to_remotes
# An OSTreeCache manages artifacts in an OSTree repository
#
# Args:
# context (Context): The BuildStream context
# project (Project): The BuildStream project
# enable_push (bool): Whether pushing is allowed by the platform
#
# Pushing is explicitly disabled by the platform in some cases,
# like when we are falling back to functioning without using
# user namespaces.
#
class OSTreeCache(ArtifactCache):
def __init__(self, context, *, enable_push):
super().__init__(context)
self.enable_push = enable_push
ostreedir = os.path.join(context.artifactdir, 'ostree')
self.repo = _ostree.ensure(ostreedir, False)
# Per-project list of OSTreeRemote and OSTreeArtifactMap instances.
self._remotes = {}
self._artifact_maps = {}
self._has_fetch_remotes = False
self._has_push_remotes = False
def has_fetch_remotes(self):
return self._has_fetch_remotes
def has_push_remotes(self, *, element=None):
if not self._has_push_remotes:
# No project has push remotes
return False
elif element is None:
# At least one (sub)project has push remotes
return True
else:
# Check whether the specified element's project has push remotes
remotes_for_project = self._remotes[element._get_project()]
return any(remote.spec.push for remote in remotes_for_project)
# contains():
#
# Check whether the artifact for the specified Element is already available
# in the local artifact cache.
#
# Args:
# element (Element): The Element to check
# key (str): The cache key to use
#
# Returns: True if the artifact is in the cache, False otherwise
#
def contains(self, element, key):
ref = buildref(element, key)
return _ostree.exists(self.repo, ref)
# remotes_containing_key():
#
# Return every remote cache that contains the key. The result will be an
# ordered list of remotes.
#
# Args:
# element (Element): The Element to check
# key (str): The key to use
#
# Returns (list): A list of _OSTreeRemote instances.
#
def remotes_containing_key(self, element, key):
if not self._has_fetch_remotes:
return []
artifact_map = self._artifact_maps[element._get_project()]
ref = buildref(element, key)
return artifact_map.lookup(ref)
# remote_contains():
#
# Check whether the artifact for the specified Element is already available
# in the remote artifact cache.
#
# Args:
# element (Element): The Element to check
# key (str): The cache key to use
#
# Returns: True if the artifact is in a cache, False otherwise
#
def remote_contains(self, element, key):
remotes = self.remotes_containing_key(element, key)
return len(remotes) > 0
# push_needed():
#
# Check whether an artifact for the specified Element needs to be pushed to
# any of the configured push remotes. The policy is to push every artifact
# we build to every configured push remote, so this should only return False
# if all of the configured push remotes already contain the given artifact.
#
# This function checks for presence of the artifact only using its strong
# key. The presence of the weak key in a cache does not necessarily indicate
# that this particular artifact is present, only that there is a
# partially-compatible version available.
#
# Args:
# element (Element): The Element to check
# key (str): The cache key to use
#
# Returns: False if all the push remotes have the artifact, True otherwise
#
def push_needed(self, element, key):
remotes_with_artifact = self.remotes_containing_key(element, key)
push_remotes_with_artifact = set(r for r in remotes_with_artifact if r.spec.push)
push_remotes_for_project = set(self._remotes[element._get_project()])
return not push_remotes_for_project.issubset(push_remotes_with_artifact)
# extract():
#
# Extract cached artifact for the specified Element if it hasn't
# already been extracted.
#
# Assumes artifact has previously been fetched or committed.
#
# Args:
# element (Element): The Element to extract
# key (str): The cache key to use
#
# Raises:
# ArtifactError: In cases there was an OSError, or if the artifact
# did not exist.
#
# Returns: path to extracted artifact
#
def extract(self, element, key):
ref = buildref(element, key)
# resolve ref to checksum
rev = _ostree.checksum(self.repo, ref)
if not rev:
raise ArtifactError("Artifact missing for {}".format(ref))
dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, rev)
if os.path.isdir(dest):
# artifact has already been extracted
return dest
os.makedirs(self.extractdir, exist_ok=True)
with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
checkoutdir = os.path.join(tmpdir, ref)
_ostree.checkout(self.repo, checkoutdir, rev, user=True)
os.makedirs(os.path.dirname(dest), exist_ok=True)
try:
os.rename(checkoutdir, dest)
except OSError as e:
# With rename, it's possible to get either ENOTEMPTY or EEXIST
# in the case that the destination path is a not empty directory.
#
# If rename fails with these errors, another process beat
# us to it so just ignore.
if e.errno not in [os.errno.ENOTEMPTY, os.errno.EEXIST]:
raise ArtifactError("Failed to extract artifact for ref '{}': {}"
.format(ref, e)) from e
return dest
# commit():
#
# Commit built artifact to cache.
#
# Args:
# element (Element): The Element commit an artifact for
# content (str): The element's content directory
# keys (list): The cache keys to use
#
def commit(self, element, content, keys):
refs = [buildref(element, key) for key in keys]
try:
_ostree.commit(self.repo, content, refs)
except OSTreeError as e:
raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
# pull():
#
# Pull artifact from one of the configured remote repositories.
#
# Args:
# element (Element): The Element whose artifact is to be fetched
# key (str): The cache key to use
# progress (callable): The progress callback, if any
#
def pull(self, element, key, *, progress=None):
project = element._get_project()
artifact_map = self._artifact_maps[project]
ref = buildref(element, key)
try:
# fetch the artifact from highest priority remote using the specified cache key
remote = artifact_map.lookup_first(ref)
remote_name = self._ensure_remote(self.repo, remote.pull_url)
_ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress)
except OSTreeError as e:
raise ArtifactError("Failed to pull artifact for element {}: {}"
.format(element.name, e)) from e
# link_key():
#
# Add a key for an existing artifact.
#
# Args:
# element (Element): The Element whose artifact is to be linked
# oldkey (str): An existing cache key for the artifact
# newkey (str): A new cache key for the artifact
#
def link_key(self, element, oldkey, newkey):
oldref = buildref(element, oldkey)
newref = buildref(element, newkey)
# resolve ref to checksum
rev = _ostree.checksum(self.repo, oldref)
# create additional ref for the same checksum
_ostree.set_ref(self.repo, newref, rev)
# push():
#
# Push committed artifact to remote repository.
#
# Args:
# element (Element): The Element whose artifact is to be pushed
# keys (list): The cache keys to use
#
# Returns:
# (bool): True if any remote was updated, False if no pushes were required
#
# Raises:
# (ArtifactError): if there was an error
def push(self, element, keys):
any_pushed = False
project = element._get_project()
push_remotes = [r for r in self._remotes[project] if r.spec.push]
if not push_remotes:
raise ArtifactError("Push is not enabled for any of the configured remote artifact caches.")
refs = [buildref(element, key) for key in keys]
remotes_for_each_ref = [self.remotes_containing_key(element, ref) for ref in refs]
for remote in push_remotes:
# Push if the remote is missing any of the refs
if any([remote not in remotes_with_ref for remotes_with_ref in remotes_for_each_ref]):
any_pushed |= self._push_to_remote(remote, element, refs)
return any_pushed
# _initialize_remote():
#
# Do protocol-specific initialization necessary to use a given OSTree
# remote.
#
# The SSH protocol that we use only supports pushing so initializing these
# involves contacting the remote to find out the corresponding pull URL.
#
# Args:
# url (str): URL of the remote
#
# Returns:
# (str, str): the pull URL and push URL for the remote
#
# Raises:
# ArtifactError: if there was an error
def _initialize_remote(self, url):
if url.startswith('ssh://'):
try:
push_url = url
pull_url = initialize_push_connection(url)
except PushException as e:
raise ArtifactError(e) from e
elif url.startswith('/'):
push_url = pull_url = 'file://' + url
elif url.startswith('file://'):
push_url = pull_url = url
elif url.startswith('http://') or url.startswith('https://'):
push_url = None
pull_url = url
else:
raise ArtifactError("Unsupported URL: {}".format(url))
return push_url, pull_url
# _ensure_remote():
#
# Ensure that our OSTree repo has a remote configured for the given URL.
# Note that SSH access to remotes is not handled by libostree itself.
#
# Args:
# repo (OSTree.Repo): an OSTree repository
# pull_url (str): the URL where libostree can pull from the remote
#
# Returns:
# (str): the name of the remote, which can be passed to various other
# operations implemented by the _ostree module.
#
# Raises:
# OSTreeError: if there was a problem reported by libostree
def _ensure_remote(self, repo, pull_url):
remote_name = utils.url_directory_name(pull_url)
_ostree.configure_remote(repo, remote_name, pull_url)
return remote_name
def initialize_remotes(self, *, on_failure=None):
remote_specs = self.global_remote_specs
for project in self.project_remote_specs:
remote_specs += self.project_remote_specs[project]
remote_specs = list(utils._deduplicate(remote_specs))
remote_results = {}
# Callback to initialize one remote in a 'multiprocessing' subprocess.
#
# We cannot do this in the main process because of the way the tasks
# run by the main scheduler calls into libostree using
# fork()-without-exec() subprocesses. OSTree fetch operations in
# subprocesses hang if fetch operations were previously done in the
# main process.
#
def child_action(url, q):
try:
push_url, pull_url = self._initialize_remote(url)
remote = self._ensure_remote(self.repo, pull_url)
remote_refs = _ostree.list_remote_refs(self.repo, remote=remote)
q.put((None, push_url, pull_url, remote_refs))
except Exception as e: # pylint: disable=broad-except
# Whatever happens, we need to return it to the calling process
#
q.put((str(e), None, None, None))
# Kick off all the initialization jobs one by one.
#
# Note that we cannot use multiprocessing.Pool here because it's not
# possible to pickle local functions such as child_action().
#
q = multiprocessing.Queue()
for remote_spec in remote_specs:
p = multiprocessing.Process(target=child_action, args=(remote_spec.url, q))
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
p.start()
error, push_url, pull_url, remote_refs = q.get()
p.join()
except KeyboardInterrupt:
utils._kill_process_tree(p.pid)
raise
if error and on_failure:
on_failure(remote_spec.url, error)
elif error:
raise ArtifactError(error)
else:
if remote_spec.push and push_url:
self._has_push_remotes = True
if pull_url:
self._has_fetch_remotes = True
remote_results[remote_spec.url] = (push_url, pull_url, remote_refs)
# Prepare push_urls, pull_urls, and remote_refs for each project
for project in self.context._get_projects():
remote_specs = self.global_remote_specs
if project in self.project_remote_specs:
remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
remotes = []
artifact_map = _OSTreeArtifactMap()
for remote_spec in remote_specs:
# Errors are already handled in the loop above,
# skip unreachable remotes here.
if remote_spec.url not in remote_results:
continue
push_url, pull_url, remote_refs = remote_results[remote_spec.url]
if remote_spec.push and not push_url:
raise ArtifactError("Push enabled but not supported by repo at: {}".format(remote_spec.url))
remote = _OSTreeRemote(remote_spec, pull_url, push_url)
remotes.append(remote)
# Update our overall map of remote refs with any refs that are
# present in the new remote and were not already found in
# higher priority ones.
for ref in remote_refs:
artifact_map.append(ref, remote)
self._artifact_maps[project] = artifact_map
self._remotes[project] = remotes
def _push_to_remote(self, remote, element, refs):
with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir:
with element.timed_activity("Preparing compressed archive"):
# First create a temporary archive-z2 repository, we can
# only use ostree-push with archive-z2 local repo.
temp_repo = _ostree.ensure(temp_repo_dir, True)
# Now push the ref we want to push into our temporary archive-z2 repo
for ref in refs:
_ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref)
with element.timed_activity("Sending artifact"), \
element._output_file() as output_file:
try:
pushed = push_artifact(temp_repo.get_path().get_path(),
remote.push_url,
refs, output_file)
except PushException as e:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e)) from e
return pushed