blob: ba105d8ffe103f7930b2dc8d9810af6910299a62 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2016 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import datetime
import os
import stat
import shlex
import shutil
import tarfile
from operator import itemgetter
from tempfile import TemporaryDirectory
from pluginbase import PluginBase
from .exceptions import _BstError, _ArtifactError
from ._message import Message, MessageType
from ._elementfactory import ElementFactory
from ._loader import Loader
from ._sourcefactory import SourceFactory
from . import Consistency, ImplError, LoadError
from . import Scope
from . import _site
from . import _yaml, utils
from ._platform import Platform
from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
# Internal exception raised when a pipeline fails
#
class PipelineError(_BstError):
def __init__(self, message=None):
# The empty string should never appear to a user,
# this only allows us to treat this internal error as
# a _BstError from the frontend.
if message is None:
message = ""
super(PipelineError, self).__init__(message)
class Planner():
def __init__(self):
self.depth_map = {}
self.visiting_elements = set()
# Here we want to traverse the same element more than once when
# it is reachable from multiple places, with the interest of finding
# the deepest occurance of every element
def plan_element(self, element, depth):
if element in self.visiting_elements:
# circular dependency, already being processed
return
prev_depth = self.depth_map.get(element)
if prev_depth is not None and prev_depth >= depth:
# element and dependencies already processed at equal or greater depth
return
self.visiting_elements.add(element)
for dep in element.dependencies(Scope.RUN, recurse=False):
self.plan_element(dep, depth)
# Dont try to plan builds of elements that are cached already
if not element._cached() and not element._remotely_cached():
for dep in element.dependencies(Scope.BUILD, recurse=False):
self.plan_element(dep, depth + 1)
self.depth_map[element] = depth
self.visiting_elements.remove(element)
def plan(self, root):
self.plan_element(root, 0)
depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True)
return [item[0] for item in depth_sorted if not item[0]._cached()]
# Pipeline()
#
# Args:
# context (Context): The Context object
# project (Project): The Project object
# target (str): A bst filename relative to the project directory
# inconsistent (bool): Whether to load the pipeline in a forcefully inconsistent state,
# this is appropriate when source tracking will run and the
# current source refs will not be the effective refs.
# rewritable (bool): Whether the loaded files should be rewritable
# this is a bit more expensive due to deep copies
# load_ticker (callable): A function which will be called for each loaded element
# resolve_ticker (callable): A function which will be called for each resolved element
# cache_ticker (callable): A function which will be called for each element
# while interrogating caches
#
# The ticker methods will be called with an element name for each tick, a final
# tick with None as the argument is passed to signal that processing of this
# stage has terminated.
#
# Raises:
# LoadError
# PluginError
# SourceError
# ElementError
# ProgramNotFoundError
#
class Pipeline():
def __init__(self, context, project, target,
inconsistent=False,
rewritable=False,
load_ticker=None,
resolve_ticker=None,
remote_ticker=None,
cache_ticker=None):
self.context = context
self.project = project
self.session_elements = 0
self.total_elements = 0
self.unused_workspaces = []
loader = Loader(self.project.element_path, target,
self.project._options,
context.host_arch, context.target_arch)
meta_element = loader.load(rewritable, load_ticker)
if load_ticker:
load_ticker(None)
# Load selected platform
self.platform = Platform.get_platform(context, project)
self.artifacts = self.platform.artifactcache
# Create the factories after resolving the project
pluginbase = PluginBase(package='buildstream.plugins')
self.element_factory = ElementFactory(pluginbase, project._plugin_element_paths)
self.source_factory = SourceFactory(pluginbase, project._plugin_source_paths)
# Resolve the real elements now that we've resolved the project
self.target = self.resolve(meta_element, ticker=resolve_ticker)
if resolve_ticker:
resolve_ticker(None)
# Preflight directly after resolving elements, before ever interrogating
# caches or anything.
for plugin in self.dependencies(Scope.ALL, include_sources=True):
plugin.preflight()
self.total_elements = len(list(self.dependencies(Scope.ALL)))
for element_name, source, workspace in project._list_workspaces():
element = self.target.search(Scope.ALL, element_name)
if element is None:
self.unused_workspaces.append((element_name, source, workspace))
continue
self.project._set_workspace(element, source, workspace)
if self.artifacts.can_fetch():
try:
if remote_ticker:
remote_ticker(context.artifact_pull)
self.artifacts.fetch_remote_refs()
except _ArtifactError:
self.message(self.target, MessageType.WARN, "Failed to fetch remote refs")
self.artifacts.set_offline()
for element in self.dependencies(Scope.ALL):
if cache_ticker:
cache_ticker(element.name)
if inconsistent:
# Load the pipeline in an explicitly inconsistent state, use
# this for pipelines with tracking queues enabled.
element._force_inconsistent()
else:
# Resolve cache keys and interrogate the artifact cache
# for the first time.
element._cached()
if cache_ticker:
cache_ticker(None)
# Generator function to iterate over elements and optionally
# also iterate over sources.
#
def dependencies(self, scope, include_sources=False):
for element in self.target.dependencies(scope):
if include_sources:
for source in element.sources():
yield source
yield element
# Asserts that the pipeline is in a consistent state, that
# is to say that all sources are consistent and can at least
# be fetched.
#
# Consequently it also means that cache keys can be resolved.
#
def assert_consistent(self, toplevel):
inconsistent = []
for element in toplevel:
if element._consistency() == Consistency.INCONSISTENT:
inconsistent.append(element)
if inconsistent:
detail = "Exact versions are missing for the following elements\n" + \
"Try tracking these elements first with `bst track`\n\n"
for element in inconsistent:
detail += " " + element.name + "\n"
self.message(self.target, MessageType.ERROR, "Inconsistent pipeline", detail=detail)
raise PipelineError()
# Generator function to iterate over only the elements
# which are required to build the pipeline target, omitting
# cached elements. The elements are yielded in a depth sorted
# ordering for optimal build plans
def plan(self):
build_plan = Planner().plan(self.target)
for element in build_plan:
yield element
# Local message propagator
#
def message(self, plugin, message_type, message, **kwargs):
args = dict(kwargs)
self.context._message(
Message(plugin._get_unique_id(),
message_type,
message,
**args))
# Internal: Instantiates plugin-provided Element and Source instances
# from MetaElement and MetaSource objects
#
def resolve(self, meta_element, resolved=None, ticker=None):
if resolved is None:
resolved = {}
if meta_element in resolved:
return resolved[meta_element]
if ticker:
ticker(meta_element.name)
element = self.element_factory.create(meta_element.kind,
self.context,
self.project,
self.artifacts,
meta_element)
resolved[meta_element] = element
# resolve dependencies
for dep in meta_element.dependencies:
element._add_dependency(self.resolve(dep, resolved=resolved, ticker=ticker), Scope.RUN)
for dep in meta_element.build_dependencies:
element._add_dependency(self.resolve(dep, resolved=resolved, ticker=ticker), Scope.BUILD)
# resolve sources
for meta_source in meta_element.sources:
element._add_source(
self.source_factory.create(meta_source.kind,
self.context,
self.project,
meta_source)
)
return element
# Internal: If a remote artifact cache is configured for pushing, check
# that it actually works. Returns True if it works, False otherwise.
def can_push_remote_artifact_cache(self):
if self.artifacts.can_push():
starttime = datetime.datetime.now()
self.message(self.target, MessageType.START, "Checking connectivity to remote artifact cache")
try:
self.artifacts.preflight()
except _ArtifactError as e:
self.message(self.target, MessageType.WARN, str(e),
elapsed=datetime.datetime.now() - starttime)
return False
self.message(self.target, MessageType.SUCCESS, "Connectivity OK",
elapsed=datetime.datetime.now() - starttime)
return True
else:
return False
#############################################################
# Commands #
#############################################################
# track()
#
# Trackes all the sources of all the elements in the pipeline,
# i.e. all of the elements which the target somehow depends on.
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# dependencies (list): List of elements to track
# except_ (list): List of elements to except from tracking
#
# If no error is encountered while tracking, then the project files
# are rewritten inline.
#
def track(self, scheduler, dependencies):
dependencies = list(dependencies)
track = TrackQueue()
track.enqueue(dependencies)
self.session_elements = len(dependencies)
self.message(self.target, MessageType.START, "Starting track")
elapsed, status = scheduler.run([track])
changed = len(track.processed_elements)
if status == SchedStatus.ERROR:
self.message(self.target, MessageType.FAIL, "Track failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(self.target, MessageType.WARN,
"Terminated after updating {} source references".format(changed),
elapsed=elapsed)
raise PipelineError()
else:
self.message(self.target, MessageType.SUCCESS,
"Updated {} source references".format(changed),
elapsed=elapsed)
# fetch()
#
# Fetches sources on the pipeline.
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# dependencies (list): List of elements to fetch
# track_first (bool): Track new source references before fetching
# except_ (list): List of elements to except from fetching
#
def fetch(self, scheduler, dependencies, track_first):
plan = dependencies
# Assert that we have a consistent pipeline, or that
# the track option will make it consistent
if not track_first:
self.assert_consistent(plan)
# Filter out elements with cached sources, we already have them.
cached = [elt for elt in plan if elt._consistency() == Consistency.CACHED]
plan = [elt for elt in plan if elt not in cached]
self.session_elements = len(plan)
fetch = FetchQueue()
if track_first:
track = TrackQueue()
track.enqueue(plan)
queues = [track, fetch]
else:
track = None
fetch.enqueue(plan)
queues = [fetch]
self.message(self.target, MessageType.START, "Fetching {} elements".format(len(plan)))
elapsed, status = scheduler.run(queues)
fetched = len(fetch.processed_elements)
if status == SchedStatus.ERROR:
self.message(self.target, MessageType.FAIL, "Fetch failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(self.target, MessageType.WARN,
"Terminated after fetching {} elements".format(fetched),
elapsed=elapsed)
raise PipelineError()
else:
self.message(self.target, MessageType.SUCCESS,
"Fetched {} elements".format(fetched),
elapsed=elapsed)
# build()
#
# Builds (assembles) elements in the pipeline.
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# build_all (bool): Whether to build all elements, or only those
# which are required to build the target.
# track_first (bool): Track sources before fetching and building (implies build_all)
#
def build(self, scheduler, build_all, track_first):
if len(self.unused_workspaces) > 0:
self.message(self.target, MessageType.WARN, "Unused workspaces",
detail="\n".join([el + "-" + str(src) for el, src, _
in self.unused_workspaces]))
if build_all or track_first:
plan = list(self.dependencies(Scope.ALL))
else:
plan = list(self.plan())
# Assert that we have a consistent pipeline, or that
# the track option will make it consistent
if not track_first:
self.assert_consistent(plan)
fetch = FetchQueue(skip_cached=True)
build = BuildQueue()
track = None
pull = None
push = None
queues = []
if track_first:
track = TrackQueue()
queues.append(track)
if self.artifacts.can_fetch():
pull = PullQueue()
queues.append(pull)
queues.append(fetch)
queues.append(build)
if self.can_push_remote_artifact_cache():
push = PushQueue()
queues.append(push)
queues[0].enqueue(plan)
self.session_elements = len(plan)
self.message(self.target, MessageType.START, "Starting build")
elapsed, status = scheduler.run(queues)
built = len(build.processed_elements)
if status == SchedStatus.ERROR:
self.message(self.target, MessageType.FAIL, "Build failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(self.target, MessageType.WARN, "Terminated", elapsed=elapsed)
raise PipelineError()
else:
self.message(self.target, MessageType.SUCCESS, "Build Complete", elapsed=elapsed)
# checkout()
#
# Checkout the pipeline target artifact to the specified directory
#
# Args:
# directory (str): The directory to checkout the artifact to
# force (bool): Force overwrite files which exist in `directory`
#
def checkout(self, directory, force):
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
raise PipelineError("Failed to create checkout directory: {}".format(e)) from e
if not os.access(directory, os.W_OK):
raise PipelineError("Directory {} not writable".format(directory))
if not force and os.listdir(directory):
raise PipelineError("Checkout directory is not empty: {}"
.format(directory))
# BuildStream will one day be able to run host-incompatible binaries
# by using a QEMU sandbox, but for now we need to disable integration
# commands for cross-build artifacts.
can_integrate = (self.context.host_arch == self.context.target_arch)
if not can_integrate:
self.message(self.target, MessageType.WARN,
"Host-incompatible checkout -- no integration commands can be run")
# Stage deps into a temporary sandbox first
with self.target._prepare_sandbox(Scope.RUN, None, integrate=can_integrate) as sandbox:
# Make copies from the sandbox into to the desired directory
sandbox_root = sandbox.get_directory()
with self.target.timed_activity("Copying files to {}".format(directory)):
try:
utils.copy_files(sandbox_root, directory)
except OSError as e:
raise PipelineError("Failed to copy files: {}".format(e)) from e
# open_workspace
#
# Open a project workspace.
#
# Args:
# directory (str): The directory to stage the source in
# source_index (int): The index of the source to stage
# no_checkout (bool): Whether to skip checking out the source
# track_first (bool): Whether to track and fetch first
# force (bool): Whether to ignore contents in an existing directory
#
def open_workspace(self, scheduler, directory, source_index, no_checkout, track_first, force):
workdir = os.path.abspath(directory)
sources = list(self.target.sources())
source_index = self.validate_workspace_index(source_index)
# Check directory
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
raise PipelineError("Failed to create workspace directory: {}".format(e)) from e
if not no_checkout and not force and os.listdir(directory):
raise PipelineError("Checkout directory is not empty: {}".format(directory))
# Check for workspace config
if self.project._get_workspace(self.target.name, source_index):
raise PipelineError("Workspace '{}' is already defined."
.format(self.target.name + " - " + str(source_index)))
plan = [self.target]
# Track/fetch if required
queues = []
track = None
if track_first:
track = TrackQueue()
queues.append(track)
if not no_checkout or track_first:
fetch = FetchQueue(skip_cached=True)
queues.append(fetch)
if len(queues) > 0:
queues[0].enqueue(plan)
elapsed, status = scheduler.run(queues)
fetched = len(fetch.processed_elements)
if status == SchedStatus.ERROR:
self.message(self.target, MessageType.FAIL, "Tracking failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(self.target, MessageType.WARN,
"Terminated after fetching {} elements".format(fetched),
elapsed=elapsed)
raise PipelineError()
else:
self.message(self.target, MessageType.SUCCESS,
"Fetched {} elements".format(fetched), elapsed=elapsed)
if not no_checkout:
source = sources[source_index]
with self.target.timed_activity("Staging source to {}".format(directory)):
if source.get_consistency() != Consistency.CACHED:
raise PipelineError("Could not stage uncached source. " +
"Use `--track` to track and " +
"fetch the latest version of the " +
"source.")
source._stage(directory)
self.project._set_workspace(self.target, source_index, workdir)
with self.target.timed_activity("Saving workspace configuration"):
self.project._save_workspace_config()
# close_workspace
#
# Close a project workspace
#
# Args:
# source_index (int) - The index of the source
# remove_dir (bool) - Whether to remove the associated directory
#
def close_workspace(self, source_index, remove_dir):
source_index = self.validate_workspace_index(source_index)
# Remove workspace directory if prompted
if remove_dir:
path = self.project._get_workspace(self.target.name, source_index)
if path is not None:
with self.target.timed_activity("Removing workspace directory {}"
.format(path)):
try:
shutil.rmtree(path)
except OSError as e:
raise PipelineError("Could not remove '{}': {}"
.format(path, e)) from e
# Delete the workspace config entry
with self.target.timed_activity("Removing workspace"):
try:
self.project._delete_workspace(self.target.name, source_index)
except KeyError:
raise PipelineError("Workspace '{}' is currently not defined"
.format(self.target.name + " - " + str(source_index)))
# Update workspace config
self.project._save_workspace_config()
# Reset source to avoid checking out the (now empty) workspace
source = list(self.target.sources())[source_index]
source._del_workspace()
# reset_workspace
#
# Reset a workspace to its original state, discarding any user
# changes.
#
# Args:
# scheduler: The app scheduler
# source_index (int): The index of the source to reset
# track (bool): Whether to also track the source
# no_checkout (bool): Whether to check out the source (at all)
#
def reset_workspace(self, scheduler, source_index, track, no_checkout):
source_index = self.validate_workspace_index(source_index)
workspace_dir = self.project._get_workspace(self.target.name, source_index)
if workspace_dir is None:
raise PipelineError("Workspace '{}' is currently not defined"
.format(self.target.name + " - " + str(source_index)))
self.close_workspace(source_index, True)
self.open_workspace(scheduler, workspace_dir, source_index, no_checkout,
track, False)
# pull()
#
# Pulls elements from the pipeline
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# elements (list): List of elements to pull
#
def pull(self, scheduler, elements):
if not self.artifacts.can_fetch():
raise PipelineError("Not configured for pulling artifacts")
plan = elements
self.assert_consistent(plan)
self.session_elements = len(plan)
pull = PullQueue()
pull.enqueue(plan)
queues = [pull]
self.message(self.target, MessageType.START, "Pulling {} artifacts".format(len(plan)))
elapsed, status = scheduler.run(queues)
pulled = len(pull.processed_elements)
if status == SchedStatus.ERROR:
self.message(self.target, MessageType.FAIL, "Pull failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(self.target, MessageType.WARN,
"Terminated after pulling {} elements".format(pulled),
elapsed=elapsed)
raise PipelineError()
else:
self.message(self.target, MessageType.SUCCESS,
"Pulled {} complete".format(pulled),
elapsed=elapsed)
# push()
#
# Pushes elements in the pipeline
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# elements (list): List of elements to push
#
def push(self, scheduler, elements):
if not self.artifacts.can_push():
raise PipelineError("Not configured for pushing artifacts")
plan = elements
self.assert_consistent(plan)
self.session_elements = len(plan)
push = PushQueue()
push.enqueue(plan)
queues = [push]
self.message(self.target, MessageType.START, "Pushing {} artifacts".format(len(plan)))
elapsed, status = scheduler.run(queues)
pushed = len(push.processed_elements)
if status == SchedStatus.ERROR:
self.message(self.target, MessageType.FAIL, "Push failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
self.message(self.target, MessageType.WARN,
"Terminated after pushing {} elements".format(pushed),
elapsed=elapsed)
raise PipelineError()
else:
self.message(self.target, MessageType.SUCCESS,
"Pushed {} complete".format(pushed),
elapsed=elapsed)
# remove_elements():
#
# Internal function
#
# Returns all elements to be removed from the given list of
# elements when the given removed elements and their unique
# dependencies are removed.
#
# Args:
# elements (list of elements): The graph to sever elements from.
# removed (list of strings): Names of the elements to remove.
def remove_elements(self, tree, removed):
if removed is None:
removed = []
to_remove = set()
tree = list(tree)
# Find all elements that might need to be removed.
def search_tree(element_name):
for element in tree:
if element.name == element_name:
return element
return None
for element_name in removed:
element = search_tree(element_name)
if element is None:
raise PipelineError("No element named {}".format(element_name))
to_remove.update(element.dependencies(Scope.ALL))
old_to_remove = set()
while old_to_remove != to_remove:
old_to_remove = to_remove
# Of these, find all elements that are not a dependency of
# elements still in use.
for element in tree:
if element.name not in removed and element not in to_remove:
to_remove = to_remove.difference(element.dependencies(Scope.ALL, recurse=False))
to_remove = to_remove.union([e for e in tree if e.name in removed])
return [element for element in tree if element not in to_remove]
def validate_workspace_index(self, source_index):
sources = list(self.target.sources())
# Validate source_index
if len(sources) < 1:
raise PipelineError("The given element has no sources")
if len(sources) == 1 and source_index is None:
source_index = 0
if source_index is None:
raise PipelineError("An index needs to be specified for elements with more than one source")
return source_index
# Various commands define a --deps option to specify what elements to
# use in the result, this function reports a list that is appropriate for
# the selected option.
#
def deps_elements(self, mode, except_=None):
elements = None
if mode == 'none':
elements = [self.target]
elif mode == 'plan':
elements = list(self.plan())
else:
if mode == 'all':
scope = Scope.ALL
elif mode == 'build':
scope = Scope.BUILD
elif mode == 'run':
scope = Scope.RUN
elements = list(self.dependencies(scope))
return self.remove_elements(elements, except_)
# source_bundle()
#
# Create a build bundle for the given artifact.
#
# Args:
# directory (str): The directory to checkout the artifact to
#
def source_bundle(self, scheduler, dependencies, force,
track_first, compression, except_, directory):
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, self.target.normal_name + ".tar")
if compression != "none":
tar_location += "." + compression
# Attempt writing a file to generate a good error message
# early
#
# FIXME: A bit hackish
try:
open(tar_location, mode="x")
os.remove(tar_location)
except IOError as e:
raise PipelineError("Cannot write to {0}: {1}"
.format(tar_location, e)) from e
plan = list(dependencies)
self.fetch(scheduler, plan, track_first)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
# Create a temporary directory to build the source tree in
builddir = self.target.get_context().builddir
prefix = "{}-".format(self.target.normal_name)
with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
source_directory = os.path.join(tempdir, 'source')
try:
os.makedirs(source_directory)
except e:
raise PipelineError("Failed to create directory: {}"
.format(e)) from e
# Any elements that don't implement _write_script
# should not be included in the later stages.
plan = [element for element in plan
if self._write_element_script(source_directory, element)]
self._write_element_sources(tempdir, plan)
self._write_build_script(tempdir, plan)
self._collect_sources(tempdir, tar_location,
self.target.normal_name, compression)
# Write the element build script to the given directory
def _write_element_script(self, directory, element):
try:
element._write_script(directory)
except ImplError:
return False
return True
# Write all source elements to the given directory
def _write_element_sources(self, directory, elements):
for element in elements:
source_dir = os.path.join(directory, "source")
element_source_dir = os.path.join(source_dir, element.normal_name)
element._stage_sources_at(element_source_dir)
# Write a master build script to the sandbox
def _write_build_script(self, directory, elements):
module_string = ""
for element in elements:
module_string += shlex.quote(element.normal_name) + " "
script_path = os.path.join(directory, "build.sh")
with open(_site.build_all_template, "r") as f:
script_template = f.read()
with open(script_path, "w") as script:
script.write(script_template.format(modules=module_string))
os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD)
# Collect the sources in the given sandbox into a tarfile
def _collect_sources(self, directory, tar_name, element_name, compression):
with self.target.timed_activity("Creating tarball {}".format(tar_name)):
if compression == "none":
permissions = "w:"
else:
permissions = "w:" + compression
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)