blob: 18bef5c2548a91c74868844f307c6ed7ea6cf809 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2016-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import os
import stat
import shlex
import shutil
import tarfile
import itertools
from contextlib import contextmanager
from operator import itemgetter
from tempfile import TemporaryDirectory
from ._exceptions import PipelineError, ArtifactError, ImplError, BstError
from ._message import Message, MessageType
from ._loader import Loader
from . import Consistency
from . import Scope
from . import _site
from . import utils
from ._platform import Platform
from ._project import ProjectRefStorage
from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs
from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
class Planner():
def __init__(self):
self.depth_map = {}
self.visiting_elements = set()
# Here we want to traverse the same element more than once when
# it is reachable from multiple places, with the interest of finding
# the deepest occurance of every element
def plan_element(self, element, depth, ignore_cache):
if element in self.visiting_elements:
# circular dependency, already being processed
return
prev_depth = self.depth_map.get(element)
if prev_depth is not None and prev_depth >= depth:
# element and dependencies already processed at equal or greater depth
return
self.visiting_elements.add(element)
for dep in element.dependencies(Scope.RUN, recurse=False):
self.plan_element(dep, depth, ignore_cache)
# Dont try to plan builds of elements that are cached already
if ignore_cache or (not element._cached() and not element._remotely_cached()):
for dep in element.dependencies(Scope.BUILD, recurse=False):
self.plan_element(dep, depth + 1, ignore_cache)
self.depth_map[element] = depth
self.visiting_elements.remove(element)
def plan(self, roots, ignore_cache=False):
for root in roots:
self.plan_element(root, 0, ignore_cache)
depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True)
return [item[0] for item in depth_sorted if ignore_cache or not item[0]._cached()]
# Pipeline()
#
# Args:
# context (Context): The Context object
# project (Project): The Project object
# target (str): A bst filename relative to the project directory
# inconsistent (bool): Whether to load the pipeline in a forcefully inconsistent state,
# this is appropriate when source tracking will run and the
# current source refs will not be the effective refs.
# rewritable (bool): Whether the loaded files should be rewritable
# this is a bit more expensive due to deep copies
# use_configured_remote_caches (bool): Whether to connect to configured artifact remotes.
# add_remote_cache (str): Adds an additional artifact remote URL, which is
# prepended to the list of remotes (and thus given highest priority).
#
# The ticker methods will be called with an element name for each tick, a final
# tick with None as the argument is passed to signal that processing of this
# stage has terminated.
#
# Raises:
# LoadError
# PluginError
# SourceError
# ElementError
# ProgramNotFoundError
#
class Pipeline():
def __init__(self, context, project, targets, except_, rewritable=False):
self.context = context
self.project = project
self.session_elements = 0
self.total_elements = 0
self.unused_workspaces = []
self._resolved_elements = {}
self._redundant_refs = []
# Load selected platform
Platform._create_instance(context, project)
self.platform = Platform.get_platform()
self.artifacts = self.platform.artifactcache
self.loader = Loader(self.project, targets + except_)
with self.timed_activity("Loading pipeline", silent_nested=True):
meta_elements = self.loader.load(rewritable, None)
# Resolve the real elements now that we've resolved the project
with self.timed_activity("Resolving pipeline"):
resolved_elements = [self.resolve(meta_element)
for meta_element in meta_elements]
# Now warn about any redundant source references which may have
# been discovered in the resolve() phase.
if self._redundant_refs:
detail = "The following inline specified source references will be ignored:\n\n"
lines = [
"{}:{}".format(source._get_provenance(), ref)
for source, ref in self._redundant_refs
]
detail += "\n".join(lines)
self.message(MessageType.WARN, "Ignoring redundant source references", detail=detail)
self.targets = resolved_elements[:len(targets)]
self.exceptions = resolved_elements[len(targets):]
def initialize(self, use_configured_remote_caches=False,
add_remote_cache=None, track_elements=None):
# Preflight directly, before ever interrogating caches or
# anything.
self.preflight()
self.total_elements = len(list(self.dependencies(Scope.ALL)))
self.initialize_workspaces()
# Initialize remote artifact caches. We allow the commandline to override
# the user config in some cases (for example `bst push --remote=...`).
has_remote_caches = False
if add_remote_cache:
self.artifacts.set_remotes([ArtifactCacheSpec(add_remote_cache, push=True)])
has_remote_caches = True
if use_configured_remote_caches:
for project in self.context._get_projects():
artifact_caches = configured_remote_artifact_cache_specs(self.context, project)
if artifact_caches: # artifact_caches is a list of ArtifactCacheSpec instances
self.artifacts.set_remotes(artifact_caches, project=project)
has_remote_caches = True
if has_remote_caches:
self.initialize_remote_caches()
self.resolve_cache_keys(track_elements)
def preflight(self):
for plugin in self.dependencies(Scope.ALL, include_sources=True):
try:
plugin._preflight()
except BstError as e:
# Prepend the plugin identifier string to the error raised by
# the plugin so that users can more quickly identify the issue
# that a given plugin is encountering.
#
# Propagate the original error reason for test case purposes
#
raise PipelineError("{}: {}".format(plugin, e), reason=e.reason) from e
def initialize_workspaces(self):
for element_name, workspace in self.project._list_workspaces():
for target in self.targets:
element = target.search(Scope.ALL, element_name)
if element is None:
self.unused_workspaces.append((element_name, workspace))
continue
self.project._set_workspace(element, workspace)
def initialize_remote_caches(self):
def remote_failed(url, error):
self.message(MessageType.WARN, "Failed to fetch remote refs from {}: {}".format(url, error))
with self.timed_activity("Initializing remote caches", silent_nested=True):
self.artifacts.initialize_remotes(on_failure=remote_failed)
def resolve_cache_keys(self, track_elements):
if track_elements:
track_elements = self.get_elements_to_track(track_elements)
with self.timed_activity("Resolving cached state", silent_nested=True):
for element in self.dependencies(Scope.ALL):
if track_elements and element in track_elements:
# Load the pipeline in an explicitly inconsistent state, use
# this for pipelines with tracking queues enabled.
element._schedule_tracking()
# Determine initial element state. This may resolve cache keys
# and interrogate the artifact cache.
element._update_state()
# Generator function to iterate over elements and optionally
# also iterate over sources.
#
def dependencies(self, scope, include_sources=False):
# Keep track of 'visited' in this scope, so that all targets
# share the same context.
visited = {}
for target in self.targets:
for element in target.dependencies(scope, visited=visited):
if include_sources:
for source in element.sources():
yield source
yield element
# Asserts that the pipeline is in a consistent state, that
# is to say that all sources are consistent and can at least
# be fetched.
#
# Consequently it also means that cache keys can be resolved.
#
def assert_consistent(self, toplevel):
inconsistent = []
with self.timed_activity("Checking sources"):
for element in toplevel:
if element._consistency() == Consistency.INCONSISTENT:
inconsistent.append(element)
if inconsistent:
detail = "Exact versions are missing for the following elements\n" + \
"Try tracking these elements first with `bst track`\n\n"
for element in inconsistent:
detail += " " + element.name + "\n"
raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
# assert_junction_tracking()
#
# Raises an error if tracking is attempted on junctioned elements and
# a project.refs file is not enabled for the toplevel project.
#
# Args:
# elements (list of Element): The list of elements to be tracked
# build (bool): Whether this is being called for `bst build`, otherwise `bst track`
#
# The `build` argument is only useful for suggesting an appropriate
# alternative to the user
#
def assert_junction_tracking(self, elements, *, build):
# We can track anything if the toplevel project uses project.refs
#
if self.project._ref_storage == ProjectRefStorage.PROJECT_REFS:
return
# Ideally, we would want to report every cross junction element but not
# their dependencies, unless those cross junction elements dependencies
# were also explicitly requested on the command line.
#
# But this is too hard, lets shoot for a simple error.
for element in elements:
element_project = element._get_project()
if element_project is not self.project:
suggestion = '--except'
if build:
suggestion = '--track-except'
detail = "Requested to track sources across junction boundaries\n" + \
"in a project which does not use separate source references.\n\n" + \
"Try using `{}` arguments to limit the scope of tracking.".format(suggestion)
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
# Generator function to iterate over only the elements
# which are required to build the pipeline target, omitting
# cached elements. The elements are yielded in a depth sorted
# ordering for optimal build plans
def plan(self, except_=True):
build_plan = Planner().plan(self.targets)
if except_:
build_plan = self.remove_elements(build_plan)
for element in build_plan:
yield element
# Local message propagator
#
def message(self, message_type, message, **kwargs):
args = dict(kwargs)
self.context._message(
Message(None, message_type, message, **args))
# Local timed activities, announces the jobs as well
#
@contextmanager
def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
with self.context._timed_activity(activity_name,
detail=detail,
silent_nested=silent_nested):
yield
# Internal: Instantiates plugin-provided Element and Source instances
# from MetaElement and MetaSource objects
#
# This has a side effect of populating `self._redundant_refs` so
# we can later print a warning
#
def resolve(self, meta_element):
if meta_element in self._resolved_elements:
return self._resolved_elements[meta_element]
element = meta_element.project._create_element(meta_element.kind,
self.artifacts,
meta_element)
self._resolved_elements[meta_element] = element
# resolve dependencies
for dep in meta_element.dependencies:
element._add_dependency(self.resolve(dep), Scope.RUN)
for dep in meta_element.build_dependencies:
element._add_dependency(self.resolve(dep), Scope.BUILD)
# resolve sources
for meta_source in meta_element.sources:
source = meta_element.project._create_source(meta_source.kind, meta_source)
redundant_ref = source._load_ref()
element._add_source(source)
# Collect redundant refs for a warning message
if redundant_ref is not None:
self._redundant_refs.append((source, redundant_ref))
return element
#############################################################
# Commands #
#############################################################
# track()
#
# Trackes all the sources of all the elements in the pipeline,
# i.e. all of the elements which the target somehow depends on.
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# dependencies (list): List of elements to track
#
# If no error is encountered while tracking, then the project files
# are rewritten inline.
#
def track(self, scheduler, dependencies):
dependencies = list(dependencies)
track = TrackQueue()
track.enqueue(dependencies)
self.session_elements = len(dependencies)
self.assert_junction_tracking(dependencies, build=False)
self.message(MessageType.START, "Starting track")
elapsed, status = scheduler.run([track])
changed = len(track.processed_elements)
if status == SchedStatus.ERROR:
self.message(MessageType.FAIL, "Track failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(MessageType.WARN,
"Terminated after updating {} source references".format(changed),
elapsed=elapsed)
raise PipelineError()
else:
self.message(MessageType.SUCCESS,
"Updated {} source references".format(changed),
elapsed=elapsed)
# fetch()
#
# Fetches sources on the pipeline.
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# dependencies (list): List of elements to fetch
# track_first (bool): Track new source references before fetching
#
def fetch(self, scheduler, dependencies, track_first):
plan = dependencies
# Assert that we have a consistent pipeline, or that
# the track option will make it consistent
if not track_first:
self.assert_consistent(plan)
# Filter out elements with cached sources, we already have them.
cached = [elt for elt in plan if elt._consistency() == Consistency.CACHED]
plan = [elt for elt in plan if elt not in cached]
self.session_elements = len(plan)
fetch = FetchQueue()
if track_first:
track = TrackQueue()
track.enqueue(plan)
queues = [track, fetch]
else:
track = None
fetch.enqueue(plan)
queues = [fetch]
self.message(MessageType.START, "Fetching {} elements".format(len(plan)))
elapsed, status = scheduler.run(queues)
fetched = len(fetch.processed_elements)
if status == SchedStatus.ERROR:
self.message(MessageType.FAIL, "Fetch failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(MessageType.WARN,
"Terminated after fetching {} elements".format(fetched),
elapsed=elapsed)
raise PipelineError()
else:
self.message(MessageType.SUCCESS,
"Fetched {} elements".format(fetched),
elapsed=elapsed)
def get_elements_to_track(self, track_targets):
planner = Planner()
target_elements = [e for e in self.dependencies(Scope.ALL)
if e.name in track_targets]
track_elements = planner.plan(target_elements, ignore_cache=True)
return self.remove_elements(track_elements)
# build()
#
# Builds (assembles) elements in the pipeline.
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# build_all (bool): Whether to build all elements, or only those
# which are required to build the target.
# track_first (list): Elements whose sources to track prior to
# building
#
def build(self, scheduler, build_all, track_first):
if self.unused_workspaces:
self.message(MessageType.WARN, "Unused workspaces",
detail="\n".join([el for el, _
in self.unused_workspaces]))
# We set up two plans; one to track elements, the other to
# build them once tracking has finished. The first plan
# contains elements from track_first, the second contains the
# target elements.
#
# The reason we can't use one plan is that the tracking
# elements may consist of entirely different elements.
track_plan = []
if track_first:
track_plan = self.get_elements_to_track(track_first)
self.assert_junction_tracking(track_plan, build=True)
if build_all:
plan = self.dependencies(Scope.ALL)
else:
plan = self.plan(except_=False)
# We want to start the build queue with any elements that are
# not being tracked first
track_elements = set(track_plan)
plan = [e for e in plan if e not in track_elements]
# Assert that we have a consistent pipeline now (elements in
# track_plan will be made consistent)
self.assert_consistent(plan)
fetch = FetchQueue(skip_cached=True)
build = BuildQueue()
track = None
pull = None
push = None
queues = []
if track_plan:
track = TrackQueue()
queues.append(track)
if self.artifacts.has_fetch_remotes():
pull = PullQueue()
queues.append(pull)
queues.append(fetch)
queues.append(build)
if self.artifacts.has_push_remotes():
push = PushQueue()
queues.append(push)
if track:
queues[0].enqueue(track_plan)
queues[1].enqueue(plan)
else:
queues[0].enqueue(plan)
self.session_elements = len(track_plan) + len(plan)
self.message(MessageType.START, "Starting build")
elapsed, status = scheduler.run(queues)
if status == SchedStatus.ERROR:
self.message(MessageType.FAIL, "Build failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(MessageType.WARN, "Terminated", elapsed=elapsed)
raise PipelineError()
else:
self.message(MessageType.SUCCESS, "Build Complete", elapsed=elapsed)
# checkout()
#
# Checkout the pipeline target artifact to the specified directory
#
# Args:
# directory (str): The directory to checkout the artifact to
# force (bool): Force overwrite files which exist in `directory`
# integrate (bool): Whether to run integration commands
# hardlinks (bool): Whether checking out files hardlinked to
# their artifacts is acceptable
#
def checkout(self, directory, force, integrate, hardlinks):
# We only have one target in a checkout command
target = self.targets[0]
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
raise PipelineError("Failed to create checkout directory: {}".format(e)) from e
if not os.access(directory, os.W_OK):
raise PipelineError("Directory {} not writable".format(directory))
if not force and os.listdir(directory):
raise PipelineError("Checkout directory is not empty: {}"
.format(directory))
# Stage deps into a temporary sandbox first
try:
with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
# Copy or move the sandbox to the target directory
sandbox_root = sandbox.get_directory()
with target.timed_activity("Checking out files in {}".format(directory)):
try:
if hardlinks:
self.checkout_hardlinks(sandbox_root, directory)
else:
utils.copy_files(sandbox_root, directory)
except OSError as e:
raise PipelineError("Failed to checkout files: {}".format(e)) from e
except BstError as e:
raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e),
reason=e.reason) from e
# Helper function for checkout()
#
def checkout_hardlinks(self, sandbox_root, directory):
try:
removed = utils.safe_remove(directory)
except OSError as e:
raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e
if removed:
# Try a simple rename of the sandbox root; if that
# doesnt cut it, then do the regular link files code path
try:
os.rename(sandbox_root, directory)
except OSError:
os.makedirs(directory, exist_ok=True)
utils.link_files(sandbox_root, directory)
else:
utils.link_files(sandbox_root, directory)
# open_workspace
#
# Open a project workspace.
#
# Args:
# directory (str): The directory to stage the source in
# no_checkout (bool): Whether to skip checking out the source
# track_first (bool): Whether to track and fetch first
# force (bool): Whether to ignore contents in an existing directory
#
def open_workspace(self, scheduler, directory, no_checkout, track_first, force):
# When working on workspaces we only have one target
target = self.targets[0]
# Some elements act as proxies for other elements
target = target._get_real_element()
# Ordinarily, this would be done by pipeline initialization, but we need the
# elements initialized before we know which ones to track.
if track_first:
self.resolve_cache_keys([target.name])
workdir = os.path.abspath(directory)
if not list(target.sources()):
build_depends = [x.name for x in target.dependencies(Scope.BUILD, recurse=False)]
if not build_depends:
raise PipelineError("The given element has no sources or build-dependencies")
detail = "Try opening a workspace on one of its dependencies instead:\n"
detail += " \n".join(build_depends)
raise PipelineError("The given element has no sources", detail=detail)
# Check for workspace config
if self.project._get_workspace(target.name):
raise PipelineError("Workspace '{}' is already defined."
.format(target.name))
plan = [target]
# Track/fetch if required
queues = []
track = None
if track_first:
track = TrackQueue()
queues.append(track)
if not no_checkout or track_first:
fetch = FetchQueue(skip_cached=True)
queues.append(fetch)
if queues:
queues[0].enqueue(plan)
elapsed, status = scheduler.run(queues)
fetched = len(fetch.processed_elements)
if status == SchedStatus.ERROR:
self.message(MessageType.FAIL, "Tracking failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(MessageType.WARN,
"Terminated after fetching {} elements".format(fetched),
elapsed=elapsed)
raise PipelineError()
else:
self.message(MessageType.SUCCESS,
"Fetched {} elements".format(fetched), elapsed=elapsed)
if not no_checkout and target._consistency() != Consistency.CACHED:
raise PipelineError("Could not stage uncached source. " +
"Use `--track` to track and " +
"fetch the latest version of the " +
"source.")
# Check directory
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
raise PipelineError("Failed to create workspace directory: {}".format(e)) from e
if not no_checkout:
if not force and os.listdir(directory):
raise PipelineError("Checkout directory is not empty: {}".format(directory))
with target.timed_activity("Staging sources to {}".format(directory)):
for source in target.sources():
source._init_workspace(directory)
self.project._set_workspace(target, workdir)
with target.timed_activity("Saving workspace configuration"):
self.project._save_workspace_config()
# close_workspace
#
# Close a project workspace
#
# Args:
# remove_dir (bool) - Whether to remove the associated directory
#
def close_workspace(self, remove_dir):
# When working on workspaces we only have one target
target = self.targets[0]
# Some elements act as proxies for other elements
target = target._get_real_element()
# Remove workspace directory if prompted
if remove_dir:
path = self.project._get_workspace(target.name)
if path is not None:
with target.timed_activity("Removing workspace directory {}"
.format(path)):
try:
shutil.rmtree(path)
except OSError as e:
raise PipelineError("Could not remove '{}': {}"
.format(path, e)) from e
# Delete the workspace config entry
with target.timed_activity("Removing workspace"):
try:
self.project._delete_workspace(target.name)
except KeyError:
raise PipelineError("Workspace '{}' is currently not defined"
.format(target.name))
# Update workspace config
self.project._save_workspace_config()
# Reset source to avoid checking out the (now empty) workspace
for source in target.sources():
source._del_workspace()
# reset_workspace
#
# Reset a workspace to its original state, discarding any user
# changes.
#
# Args:
# scheduler: The app scheduler
# track (bool): Whether to also track the source
# no_checkout (bool): Whether to check out the source (at all)
#
def reset_workspace(self, scheduler, track, no_checkout):
# When working on workspaces we only have one target
target = self.targets[0]
# Some elements act as proxies for other elements
target = target._get_real_element()
workspace_dir = self.project._get_workspace(target.name)
if workspace_dir is None:
raise PipelineError("Workspace '{}' is currently not defined"
.format(target.name))
self.close_workspace(True)
self.open_workspace(scheduler, workspace_dir, no_checkout, track, False)
# pull()
#
# Pulls elements from the pipeline
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# elements (list): List of elements to pull
#
def pull(self, scheduler, elements):
if not self.artifacts.has_fetch_remotes():
raise PipelineError("Not artifact caches available for pulling artifacts")
plan = elements
self.assert_consistent(plan)
self.session_elements = len(plan)
pull = PullQueue()
pull.enqueue(plan)
queues = [pull]
self.message(MessageType.START, "Pulling {} artifacts".format(len(plan)))
elapsed, status = scheduler.run(queues)
pulled = len(pull.processed_elements)
if status == SchedStatus.ERROR:
self.message(MessageType.FAIL, "Pull failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(MessageType.WARN,
"Terminated after pulling {} elements".format(pulled),
elapsed=elapsed)
raise PipelineError()
else:
self.message(MessageType.SUCCESS,
"Pulled {} complete".format(pulled),
elapsed=elapsed)
# push()
#
# Pushes elements in the pipeline
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# elements (list): List of elements to push
#
def push(self, scheduler, elements):
if not self.artifacts.has_push_remotes():
raise PipelineError("No artifact caches available for pushing artifacts")
plan = elements
self.assert_consistent(plan)
self.session_elements = len(plan)
push = PushQueue()
push.enqueue(plan)
queues = [push]
self.message(MessageType.START, "Pushing {} artifacts".format(len(plan)))
elapsed, status = scheduler.run(queues)
pushed = len(push.processed_elements)
if status == SchedStatus.ERROR:
self.message(MessageType.FAIL, "Push failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(MessageType.WARN,
"Terminated after pushing {} elements".format(pushed),
elapsed=elapsed)
raise PipelineError()
else:
self.message(MessageType.SUCCESS,
"Pushed {} complete".format(pushed),
elapsed=elapsed)
# remove_elements():
#
# Internal function
#
# Return what we are left with after the intersection between
# excepted and target elements and their unique dependencies is
# gone.
#
# Args:
# elements (list of elements): The list to remove elements from.
def remove_elements(self, elements):
targeted = list(self.dependencies(Scope.ALL))
visited = []
def find_intersection(element):
if element in visited:
return
visited.append(element)
# Intersection elements are those that are also in
# 'targeted', as long as we don't recurse into them.
if element in targeted:
yield element
else:
for dep in element.dependencies(Scope.ALL, recurse=False):
yield from find_intersection(dep)
# Build a list of 'intersection' elements, i.e. the set of
# elements that lie on the border closest to excepted elements
# between excepted and target elements.
intersection = list(itertools.chain.from_iterable(
find_intersection(element) for element in self.exceptions
))
# Now use this set of elements to traverse the targeted
# elements, except 'intersection' elements and their unique
# dependencies.
queue = []
visited = []
queue.extend(self.targets)
while queue:
element = queue.pop()
if element in visited or element in intersection:
continue
visited.append(element)
queue.extend(element.dependencies(Scope.ALL, recurse=False))
# That looks like a lot, but overall we only traverse (part
# of) the graph twice. This could be reduced to once if we
# kept track of parent elements, but is probably not
# significant.
# Ensure that we return elements in the same order they were
# in before.
return [element for element in elements if element in visited]
# Various commands define a --deps option to specify what elements to
# use in the result, this function reports a list that is appropriate for
# the selected option.
#
def deps_elements(self, mode):
elements = None
if mode == 'none':
elements = self.targets
elif mode == 'plan':
elements = list(self.plan())
else:
if mode == 'all':
scope = Scope.ALL
elif mode == 'build':
scope = Scope.BUILD
elif mode == 'run':
scope = Scope.RUN
elements = list(self.dependencies(scope))
return self.remove_elements(elements)
# source_bundle()
#
# Create a build bundle for the given artifact.
#
# Args:
# directory (str): The directory to checkout the artifact to
#
def source_bundle(self, scheduler, dependencies, force,
track_first, compression, directory):
# source-bundle only supports one target
target = self.targets[0]
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
if compression != "none":
tar_location += "." + compression
# Attempt writing a file to generate a good error message
# early
#
# FIXME: A bit hackish
try:
open(tar_location, mode="x")
os.remove(tar_location)
except IOError as e:
raise PipelineError("Cannot write to {0}: {1}"
.format(tar_location, e)) from e
plan = list(dependencies)
self.fetch(scheduler, plan, track_first)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
# Create a temporary directory to build the source tree in
builddir = target._get_context().builddir
prefix = "{}-".format(target.normal_name)
with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
source_directory = os.path.join(tempdir, 'source')
try:
os.makedirs(source_directory)
except OSError as e:
raise PipelineError("Failed to create directory: {}"
.format(e)) from e
# Any elements that don't implement _write_script
# should not be included in the later stages.
plan = [element for element in plan
if self._write_element_script(source_directory, element)]
self._write_element_sources(tempdir, plan)
self._write_build_script(tempdir, plan)
self._collect_sources(tempdir, tar_location,
target.normal_name, compression)
# Write the element build script to the given directory
def _write_element_script(self, directory, element):
try:
element._write_script(directory)
except ImplError:
return False
return True
# Write all source elements to the given directory
def _write_element_sources(self, directory, elements):
for element in elements:
source_dir = os.path.join(directory, "source")
element_source_dir = os.path.join(source_dir, element.normal_name)
element._stage_sources_at(element_source_dir)
# Write a master build script to the sandbox
def _write_build_script(self, directory, elements):
module_string = ""
for element in elements:
module_string += shlex.quote(element.normal_name) + " "
script_path = os.path.join(directory, "build.sh")
with open(_site.build_all_template, "r") as f:
script_template = f.read()
with utils.save_file_atomic(script_path, "w") as script:
script.write(script_template.format(modules=module_string))
os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD)
# Collect the sources in the given sandbox into a tarfile
def _collect_sources(self, directory, tar_name, element_name, compression):
with self.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
if compression == "none":
permissions = "w:"
else:
permissions = "w:" + compression
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
def cleanup(self):
if self.loader:
self.loader.cleanup()