blob: 1d0dee4419c38ceac2e217512d7bc5270a88da2d [file] [log] [blame]
#
# Copyright (C) 2017-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import multiprocessing
import os
import signal
import tempfile
from .. import _ostree, _signals, utils
from .._exceptions import ArtifactError
from .._ostree import OSTreeError
from . import ArtifactCache
from .pushreceive import initialize_push_connection
from .pushreceive import push as push_artifact
from .pushreceive import PushException
# An OSTreeCache manages artifacts in an OSTree repository
#
# Args:
# context (Context): The BuildStream context
# project (Project): The BuildStream project
# enable_push (bool): Whether pushing is allowed by the platform
#
# Pushing is explicitly disabled by the platform in some cases,
# like when we are falling back to functioning without using
# user namespaces.
#
class OSTreeCache(ArtifactCache):
def __init__(self, context, *, enable_push):
super().__init__(context)
self.enable_push = enable_push
ostreedir = os.path.join(context.artifactdir, 'ostree')
self.repo = _ostree.ensure(ostreedir, False)
# Per-project list of OSTreeRemote instances.
self._remotes = {}
self._has_fetch_remotes = False
self._has_push_remotes = False
################################################
# Implementation of abstract methods #
################################################
def has_fetch_remotes(self, *, element=None):
if not self._has_fetch_remotes:
# No project has push remotes
return False
elif element is None:
# At least one (sub)project has fetch remotes
return True
else:
# Check whether the specified element's project has fetch remotes
remotes_for_project = self._remotes[element._get_project()]
return bool(remotes_for_project)
def has_push_remotes(self, *, element=None):
if not self._has_push_remotes:
# No project has push remotes
return False
elif element is None:
# At least one (sub)project has push remotes
return True
else:
# Check whether the specified element's project has push remotes
remotes_for_project = self._remotes[element._get_project()]
return any(remote.spec.push for remote in remotes_for_project)
def contains(self, element, key):
ref = self.get_artifact_fullname(element, key)
return _ostree.exists(self.repo, ref)
def extract(self, element, key):
ref = self.get_artifact_fullname(element, key)
# resolve ref to checksum
rev = _ostree.checksum(self.repo, ref)
# Extracting a nonexistent artifact is a bug
assert rev, "Artifact missing for {}".format(ref)
dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, rev)
if os.path.isdir(dest):
# artifact has already been extracted
return dest
os.makedirs(self.extractdir, exist_ok=True)
with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
checkoutdir = os.path.join(tmpdir, ref)
_ostree.checkout(self.repo, checkoutdir, rev, user=True)
os.makedirs(os.path.dirname(dest), exist_ok=True)
try:
os.rename(checkoutdir, dest)
except OSError as e:
# With rename, it's possible to get either ENOTEMPTY or EEXIST
# in the case that the destination path is a not empty directory.
#
# If rename fails with these errors, another process beat
# us to it so just ignore.
if e.errno not in [os.errno.ENOTEMPTY, os.errno.EEXIST]:
raise ArtifactError("Failed to extract artifact for ref '{}': {}"
.format(ref, e)) from e
return dest
def commit(self, element, content, keys):
refs = [self.get_artifact_fullname(element, key) for key in keys]
try:
_ostree.commit(self.repo, content, refs)
except OSTreeError as e:
raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
def can_diff(self):
return True
def diff(self, element, key_a, key_b, *, subdir=None):
_, a, _ = self.repo.read_commit(self.get_artifact_fullname(element, key_a))
_, b, _ = self.repo.read_commit(self.get_artifact_fullname(element, key_b))
if subdir:
a = a.get_child(subdir)
b = b.get_child(subdir)
subpath = a.get_path()
else:
subpath = '/'
modified, removed, added = _ostree.diff_dirs(a, b)
modified = [os.path.relpath(item.target.get_path(), subpath) for item in modified]
removed = [os.path.relpath(item.get_path(), subpath) for item in removed]
added = [os.path.relpath(item.get_path(), subpath) for item in added]
return modified, removed, added
def pull(self, element, key, *, progress=None):
project = element._get_project()
ref = self.get_artifact_fullname(element, key)
for remote in self._remotes[project]:
try:
# fetch the artifact from highest priority remote using the specified cache key
remote_name = self._ensure_remote(self.repo, remote.pull_url)
_ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress)
element.info("Pulled artifact {} from {}".format(ref, remote.pull_url))
return True
except OSTreeError:
# Try next remote
continue
return False
def link_key(self, element, oldkey, newkey):
oldref = self.get_artifact_fullname(element, oldkey)
newref = self.get_artifact_fullname(element, newkey)
# resolve ref to checksum
rev = _ostree.checksum(self.repo, oldref)
# create additional ref for the same checksum
_ostree.set_ref(self.repo, newref, rev)
def push(self, element, keys):
any_pushed = False
project = element._get_project()
push_remotes = [r for r in self._remotes[project] if r.spec.push]
if not push_remotes:
raise ArtifactError("Push is not enabled for any of the configured remote artifact caches.")
refs = [self.get_artifact_fullname(element, key) for key in keys]
for remote in push_remotes:
any_pushed |= self._push_to_remote(remote, element, refs)
return any_pushed
def initialize_remotes(self, *, on_failure=None):
remote_specs = self.global_remote_specs.copy()
for project in self.project_remote_specs:
remote_specs.extend(self.project_remote_specs[project])
remote_specs = list(utils._deduplicate(remote_specs))
remote_results = {}
# Callback to initialize one remote in a 'multiprocessing' subprocess.
#
# We cannot do this in the main process because of the way the tasks
# run by the main scheduler calls into libostree using
# fork()-without-exec() subprocesses. OSTree fetch operations in
# subprocesses hang if fetch operations were previously done in the
# main process.
#
def child_action(url, q):
try:
push_url, pull_url = self._initialize_remote(url)
q.put((None, push_url, pull_url))
except Exception as e: # pylint: disable=broad-except
# Whatever happens, we need to return it to the calling process
#
q.put((str(e), None, None))
# Kick off all the initialization jobs one by one.
#
# Note that we cannot use multiprocessing.Pool here because it's not
# possible to pickle local functions such as child_action().
#
q = multiprocessing.Queue()
for remote_spec in remote_specs:
p = multiprocessing.Process(target=child_action, args=(remote_spec.url, q))
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
p.start()
error, push_url, pull_url = q.get()
p.join()
except KeyboardInterrupt:
utils._kill_process_tree(p.pid)
raise
if error and on_failure:
on_failure(remote_spec.url, error)
elif error:
raise ArtifactError(error)
else:
if remote_spec.push and push_url:
self._has_push_remotes = True
if pull_url:
self._has_fetch_remotes = True
remote_results[remote_spec.url] = (push_url, pull_url)
# Prepare push_urls and pull_urls for each project
for project in self.context.get_projects():
remote_specs = self.global_remote_specs
if project in self.project_remote_specs:
remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
remotes = []
for remote_spec in remote_specs:
# Errors are already handled in the loop above,
# skip unreachable remotes here.
if remote_spec.url not in remote_results:
continue
push_url, pull_url = remote_results[remote_spec.url]
if remote_spec.push and not push_url:
raise ArtifactError("Push enabled but not supported by repo at: {}".format(remote_spec.url))
remote = _OSTreeRemote(remote_spec, pull_url, push_url)
remotes.append(remote)
self._remotes[project] = remotes
################################################
# Local Private Methods #
################################################
# _initialize_remote():
#
# Do protocol-specific initialization necessary to use a given OSTree
# remote.
#
# The SSH protocol that we use only supports pushing so initializing these
# involves contacting the remote to find out the corresponding pull URL.
#
# Args:
# url (str): URL of the remote
#
# Returns:
# (str, str): the pull URL and push URL for the remote
#
# Raises:
# ArtifactError: if there was an error
def _initialize_remote(self, url):
if url.startswith('ssh://'):
try:
push_url = url
pull_url = initialize_push_connection(url)
except PushException as e:
raise ArtifactError(e) from e
elif url.startswith('/'):
push_url = pull_url = 'file://' + url
elif url.startswith('file://'):
push_url = pull_url = url
elif url.startswith('http://') or url.startswith('https://'):
push_url = None
pull_url = url
else:
raise ArtifactError("Unsupported URL: {}".format(url))
return push_url, pull_url
# _ensure_remote():
#
# Ensure that our OSTree repo has a remote configured for the given URL.
# Note that SSH access to remotes is not handled by libostree itself.
#
# Args:
# repo (OSTree.Repo): an OSTree repository
# pull_url (str): the URL where libostree can pull from the remote
#
# Returns:
# (str): the name of the remote, which can be passed to various other
# operations implemented by the _ostree module.
#
# Raises:
# OSTreeError: if there was a problem reported by libostree
def _ensure_remote(self, repo, pull_url):
remote_name = utils.url_directory_name(pull_url)
_ostree.configure_remote(repo, remote_name, pull_url)
return remote_name
def _push_to_remote(self, remote, element, refs):
with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir:
with element.timed_activity("Preparing compressed archive"):
# First create a temporary archive-z2 repository, we can
# only use ostree-push with archive-z2 local repo.
temp_repo = _ostree.ensure(temp_repo_dir, True)
# Now push the ref we want to push into our temporary archive-z2 repo
for ref in refs:
_ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref)
with element.timed_activity("Sending artifact"), \
element._output_file() as output_file:
element.info("Pushing artifact {} to {}".format(element.name, remote.push_url))
try:
pushed = push_artifact(temp_repo.get_path().get_path(),
remote.push_url,
refs, output_file)
except PushException as e:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e)) from e
return pushed
# Represents a single remote OSTree cache.
#
class _OSTreeRemote():
def __init__(self, spec, pull_url, push_url):
self.spec = spec
self.pull_url = pull_url
self.push_url = push_url