blob: 9d3074a21d583effc08936bc93ea358e344fabb6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import re
import fnmatch
import glob
import time
import logging
import mimetypes
import subprocess
import textwrap
from io import StringIO
from pathlib import Path
from datetime import date
import jinja2
from ruamel.yaml import YAML
try:
import github3
_have_github3 = True
except ImportError:
github3 = object
_have_github3 = False
try:
import pygit2
except ImportError:
PygitRemoteCallbacks = object
else:
PygitRemoteCallbacks = pygit2.RemoteCallbacks
from ..utils.source import ArrowSources
for pkg in ["requests", "urllib3", "github3"]:
logging.getLogger(pkg).setLevel(logging.WARNING)
logger = logging.getLogger("crossbow")
class CrossbowError(Exception):
pass
def _flatten(mapping):
"""Converts a hierarchical mapping to a flat dictionary"""
result = {}
for k, v in mapping.items():
if isinstance(v, dict):
for ik, iv in _flatten(v).items():
ik = ik if isinstance(ik, tuple) else (ik,)
result[(k,) + ik] = iv
elif isinstance(v, list):
for ik, iv in enumerate(_flatten(v)):
ik = ik if isinstance(ik, tuple) else (ik,)
result[(k,) + ik] = iv
else:
result[(k,)] = v
return result
def _unflatten(mapping):
"""Converts a flat tuple => object mapping to hierarchical one"""
result = {}
for path, value in mapping.items():
parents, leaf = path[:-1], path[-1]
# create the hierarchy until we reach the leaf value
temp = result
for parent in parents:
temp.setdefault(parent, {})
temp = temp[parent]
# set the leaf value
temp[leaf] = value
return result
def _unflatten_tree(files):
"""Converts a flat path => object mapping to a hierarchical directories
Input:
{
'path/to/file.a': a_content,
'path/to/file.b': b_content,
'path/file.c': c_content
}
Output:
{
'path': {
'to': {
'file.a': a_content,
'file.b': b_content
},
'file.c': c_content
}
}
"""
files = {tuple(k.split('/')): v for k, v in files.items()}
return _unflatten(files)
def _render_jinja_template(searchpath, template, params):
def format_all(items, pattern):
return [pattern.format(item) for item in items]
loader = jinja2.FileSystemLoader(searchpath)
env = jinja2.Environment(loader=loader, trim_blocks=True,
lstrip_blocks=True,
undefined=jinja2.StrictUndefined)
env.filters['format_all'] = format_all
template = env.get_template(template)
return template.render(**params)
# configurations for setting up branch skipping
# - appveyor has a feature to skip builds without an appveyor.yml
# - travis reads from the master branch and applies the rules
# - circle requires the configuration to be present on all branch, even ones
# that are configured to be skipped
# - azure skips branches without azure-pipelines.yml by default
# - github skips branches without .github/workflows/ by default
_default_travis_yml = """
branches:
only:
- master
- /.*-travis-.*/
os: linux
dist: trusty
language: generic
"""
_default_circle_yml = """
version: 2
jobs:
build:
machine: true
workflows:
version: 2
build:
jobs:
- build:
filters:
branches:
only:
- /.*-circle-.*/
"""
_default_tree = {
'.travis.yml': _default_travis_yml,
'.circleci/config.yml': _default_circle_yml
}
class GitRemoteCallbacks(PygitRemoteCallbacks):
def __init__(self, token):
self.token = token
self.attempts = 0
super().__init__()
def push_update_reference(self, refname, message):
pass
def update_tips(self, refname, old, new):
pass
def credentials(self, url, username_from_url, allowed_types):
# its a libgit2 bug, that it infinitely retries the authentication
self.attempts += 1
if self.attempts >= 5:
# pygit2 doesn't propagate the exception properly
msg = 'Wrong oauth personal access token'
print(msg)
raise CrossbowError(msg)
if allowed_types & pygit2.credentials.GIT_CREDTYPE_USERPASS_PLAINTEXT:
return pygit2.UserPass(self.token, 'x-oauth-basic')
else:
return None
def _git_ssh_to_https(url):
return url.replace('git@github.com:', 'https://github.com/')
class Repo:
"""
Base class for interaction with local git repositories
A high level wrapper used for both reading revision information from
arrow's repository and pushing continuous integration tasks to the queue
repository.
Parameters
----------
require_https : boolean, default False
Raise exception for SSH origin URLs
"""
def __init__(self, path, github_token=None, remote_url=None,
require_https=False):
self.path = Path(path)
self.github_token = github_token
self.require_https = require_https
self._remote_url = remote_url
self._pygit_repo = None
self._github_repo = None # set by as_github_repo()
self._updated_refs = []
def __str__(self):
tpl = textwrap.dedent('''
Repo: {remote}@{branch}
Commit: {head}
''')
return tpl.format(
remote=self.remote_url,
branch=self.branch.branch_name,
head=self.head
)
@property
def repo(self):
if self._pygit_repo is None:
self._pygit_repo = pygit2.Repository(str(self.path))
return self._pygit_repo
@property
def origin(self):
remote = self.repo.remotes['origin']
if self.require_https and remote.url.startswith('git@github.com'):
raise CrossbowError("Change SSH origin URL to HTTPS to use "
"Crossbow: {}".format(remote.url))
return remote
def fetch(self):
refspec = '+refs/heads/*:refs/remotes/origin/*'
self.origin.fetch([refspec])
def push(self, refs=None, github_token=None):
github_token = github_token or self.github_token
if github_token is None:
raise RuntimeError(
'Could not determine GitHub token. Please set the '
'CROSSBOW_GITHUB_TOKEN environment variable to a '
'valid GitHub access token or pass one to --github-token.'
)
callbacks = GitRemoteCallbacks(github_token)
refs = refs or []
try:
self.origin.push(refs + self._updated_refs, callbacks=callbacks)
except pygit2.GitError:
raise RuntimeError('Failed to push updated references, '
'potentially because of credential issues: {}'
.format(self._updated_refs))
else:
self.updated_refs = []
@property
def head(self):
"""Currently checked out commit's sha"""
return self.repo.head
@property
def branch(self):
"""Currently checked out branch"""
try:
return self.repo.branches[self.repo.head.shorthand]
except KeyError:
return None # detached
@property
def remote(self):
"""Currently checked out branch's remote counterpart"""
try:
return self.repo.remotes[self.branch.upstream.remote_name]
except (AttributeError, KeyError):
return None # cannot detect
@property
def remote_url(self):
"""Currently checked out branch's remote counterpart URL
If an SSH github url is set, it will be replaced by the https
equivalent usable with GitHub OAuth token.
"""
try:
return self._remote_url or _git_ssh_to_https(self.remote.url)
except AttributeError:
return None
@property
def user_name(self):
try:
return next(self.repo.config.get_multivar('user.name'))
except StopIteration:
return os.environ.get('GIT_COMMITTER_NAME', 'unknown')
@property
def user_email(self):
try:
return next(self.repo.config.get_multivar('user.email'))
except StopIteration:
return os.environ.get('GIT_COMMITTER_EMAIL', 'unknown')
@property
def signature(self):
return pygit2.Signature(self.user_name, self.user_email,
int(time.time()))
def create_tree(self, files):
builder = self.repo.TreeBuilder()
for filename, content in files.items():
if isinstance(content, dict):
# create a subtree
tree_id = self.create_tree(content)
builder.insert(filename, tree_id, pygit2.GIT_FILEMODE_TREE)
else:
# create a file
blob_id = self.repo.create_blob(content)
builder.insert(filename, blob_id, pygit2.GIT_FILEMODE_BLOB)
tree_id = builder.write()
return tree_id
def create_commit(self, files, parents=None, message='',
reference_name=None):
if parents is None:
# by default use the main branch as the base of the new branch
# required to reuse github actions cache across crossbow tasks
commit, _ = self.repo.resolve_refish("master")
parents = [commit.id]
tree_id = self.create_tree(files)
author = committer = self.signature
commit_id = self.repo.create_commit(reference_name, author, committer,
message, tree_id, parents)
return self.repo[commit_id]
def create_branch(self, branch_name, files, parents=None, message='',
signature=None):
# create commit with the passed tree
commit = self.create_commit(files, parents=parents, message=message)
# create branch pointing to the previously created commit
branch = self.repo.create_branch(branch_name, commit)
# append to the pushable references
self._updated_refs.append('refs/heads/{}'.format(branch_name))
return branch
def create_tag(self, tag_name, commit_id, message=''):
tag_id = self.repo.create_tag(tag_name, commit_id,
pygit2.GIT_OBJ_COMMIT, self.signature,
message)
# append to the pushable references
self._updated_refs.append('refs/tags/{}'.format(tag_name))
return self.repo[tag_id]
def file_contents(self, commit_id, file):
commit = self.repo[commit_id]
entry = commit.tree[file]
blob = self.repo[entry.id]
return blob.data
def _parse_github_user_repo(self):
m = re.match(r'.*\/([^\/]+)\/([^\/\.]+)(\.git)?$', self.remote_url)
if m is None:
raise CrossbowError(
"Unable to parse the github owner and repository from the "
"repository's remote url '{}'".format(self.remote_url)
)
user, repo = m.group(1), m.group(2)
return user, repo
def as_github_repo(self, github_token=None):
"""Converts it to a repository object which wraps the GitHub API"""
if self._github_repo is None:
if not _have_github3:
raise ImportError('Must install github3.py')
github_token = github_token or self.github_token
username, reponame = self._parse_github_user_repo()
session = github3.session.GitHubSession(
default_connect_timeout=10,
default_read_timeout=30
)
github = github3.GitHub(session=session)
github.login(token=github_token)
self._github_repo = github.repository(username, reponame)
return self._github_repo
def github_commit(self, sha):
repo = self.as_github_repo()
return repo.commit(sha)
def github_release(self, tag):
repo = self.as_github_repo()
try:
return repo.release_from_tag(tag)
except github3.exceptions.NotFoundError:
return None
def github_upload_asset_requests(self, release, path, name, mime,
max_retries=None, retry_backoff=None):
if max_retries is None:
max_retries = int(os.environ.get('CROSSBOW_MAX_RETRIES', 8))
if retry_backoff is None:
retry_backoff = int(os.environ.get('CROSSBOW_RETRY_BACKOFF', 5))
for i in range(max_retries):
try:
with open(path, 'rb') as fp:
result = release.upload_asset(name=name, asset=fp,
content_type=mime)
except github3.exceptions.ResponseError as e:
logger.error('Attempt {} has failed with message: {}.'
.format(i + 1, str(e)))
logger.error('Error message {}'.format(e.msg))
logger.error('List of errors provided by Github:')
for err in e.errors:
logger.error(' - {}'.format(err))
if e.code == 422:
# 422 Validation Failed, probably raised because
# ReleaseAsset already exists, so try to remove it before
# reattempting the asset upload
for asset in release.assets():
if asset.name == name:
logger.info('Release asset {} already exists, '
'removing it...'.format(name))
asset.delete()
logger.info('Asset {} removed.'.format(name))
break
except github3.exceptions.ConnectionError as e:
logger.error('Attempt {} has failed with message: {}.'
.format(i + 1, str(e)))
else:
logger.info('Attempt {} has finished.'.format(i + 1))
return result
time.sleep(retry_backoff)
raise RuntimeError('Github asset uploading has failed!')
def github_upload_asset_curl(self, release, path, name, mime):
upload_url, _ = release.upload_url.split('{?')
upload_url += '?name={}'.format(name)
command = [
'curl',
'--fail',
'-H', "Authorization: token {}".format(self.github_token),
'-H', "Content-Type: {}".format(mime),
'--data-binary', '@{}'.format(path),
upload_url
]
return subprocess.run(command, shell=False, check=True)
def github_overwrite_release_assets(self, tag_name, target_commitish,
patterns, method='requests'):
# Since github has changed something the asset uploading via requests
# got instable, so prefer the cURL alternative.
# Potential cause:
# sigmavirus24/github3.py/issues/779#issuecomment-379470626
repo = self.as_github_repo()
if not tag_name:
raise CrossbowError('Empty tag name')
if not target_commitish:
raise CrossbowError('Empty target commit for the release tag')
# remove the whole release if it already exists
try:
release = repo.release_from_tag(tag_name)
except github3.exceptions.NotFoundError:
pass
else:
release.delete()
release = repo.create_release(tag_name, target_commitish)
for pattern in patterns:
for path in glob.glob(pattern, recursive=True):
name = os.path.basename(path)
size = os.path.getsize(path)
mime = mimetypes.guess_type(name)[0] or 'application/zip'
logger.info(
'Uploading asset `{}` with mimetype {} and size {}...'
.format(name, mime, size)
)
if method == 'requests':
self.github_upload_asset_requests(release, path, name=name,
mime=mime)
elif method == 'curl':
self.github_upload_asset_curl(release, path, name=name,
mime=mime)
else:
raise CrossbowError(
'Unsupported upload method {}'.format(method)
)
class Queue(Repo):
def _latest_prefix_id(self, prefix):
pattern = re.compile(r'[\w\/-]*{}-(\d+)'.format(prefix))
matches = list(filter(None, map(pattern.match, self.repo.branches)))
if matches:
latest = max(int(m.group(1)) for m in matches)
else:
latest = -1
return latest
def _next_job_id(self, prefix):
"""Auto increments the branch's identifier based on the prefix"""
latest_id = self._latest_prefix_id(prefix)
return '{}-{}'.format(prefix, latest_id + 1)
def latest_for_prefix(self, prefix):
latest_id = self._latest_prefix_id(prefix)
if latest_id < 0:
raise RuntimeError(
'No job has been submitted with prefix {} yet'.format(prefix)
)
job_name = '{}-{}'.format(prefix, latest_id)
return self.get(job_name)
def date_of(self, job):
# it'd be better to bound to the queue repository on deserialization
# and reorganize these methods to Job
branch_name = 'origin/{}'.format(job.branch)
branch = self.repo.branches[branch_name]
commit = self.repo[branch.target]
return date.fromtimestamp(commit.commit_time)
def jobs(self, pattern):
"""Return jobs sorted by its identifier in reverse order"""
job_names = []
for name in self.repo.branches.remote:
origin, name = name.split('/', 1)
result = re.match(pattern, name)
if result:
job_names.append(name)
for name in sorted(job_names, reverse=True):
yield self.get(name)
def get(self, job_name):
branch_name = 'origin/{}'.format(job_name)
branch = self.repo.branches[branch_name]
try:
content = self.file_contents(branch.target, 'job.yml')
except KeyError:
raise CrossbowError(
'No job is found with name: {}'.format(job_name)
)
buffer = StringIO(content.decode('utf-8'))
job = yaml.load(buffer)
job.queue = self
return job
def put(self, job, prefix='build'):
if not isinstance(job, Job):
raise CrossbowError('`job` must be an instance of Job')
if job.branch is not None:
raise CrossbowError('`job.branch` is automatically generated, '
'thus it must be blank')
if job.target.remote is None:
raise CrossbowError(
'Cannot determine git remote for the Arrow repository to '
'clone or push to, try to push the `{}` branch first to have '
'a remote tracking counterpart.'.format(job.target.branch)
)
if job.target.branch is None:
raise CrossbowError(
'Cannot determine the current branch of the Arrow repository '
'to clone or push to, perhaps it is in detached HEAD state. '
'Please checkout a branch.'
)
# auto increment and set next job id, e.g. build-85
job._queue = self
job.branch = self._next_job_id(prefix)
# create tasks' branches
for task_name, task in job.tasks.items():
# adding CI's name to the end of the branch in order to use skip
# patterns on travis and circleci
task.branch = '{}-{}-{}'.format(job.branch, task.ci, task_name)
params = {
**job.params,
"arrow": job.target,
"queue_remote_url": self.remote_url
}
files = task.render_files(job.template_searchpath, params=params)
branch = self.create_branch(task.branch, files=files)
self.create_tag(task.tag, branch.target)
task.commit = str(branch.target)
# create job's branch with its description
return self.create_branch(job.branch, files=job.render_files())
def get_version(root, **kwargs):
"""
Parse function for setuptools_scm that ignores tags for non-C++
subprojects, e.g. apache-arrow-js-XXX tags.
"""
from setuptools_scm.git import parse as parse_git_version
# query the calculated version based on the git tags
kwargs['describe_command'] = (
'git describe --dirty --tags --long --match "apache-arrow-[0-9].*"'
)
version = parse_git_version(root, **kwargs)
# increment the minor version, because there can be patch releases created
# from maintenance branches where the tags are unreachable from the
# master's HEAD, so the git command above generates 0.17.0.dev300 even if
# arrow has a never 0.17.1 patch release
pattern = r"^(\d+)\.(\d+)\.(\d+)$"
match = re.match(pattern, str(version.tag))
major, minor, patch = map(int, match.groups())
# the bumped version number after 0.17.x will be 0.18.0.dev300
return "{}.{}.{}.dev{}".format(major, minor + 1, patch, version.distance)
class Serializable:
@classmethod
def to_yaml(cls, representer, data):
tag = '!{}'.format(cls.__name__)
dct = {k: v for k, v in data.__dict__.items() if not k.startswith('_')}
return representer.represent_mapping(tag, dct)
class Target(Serializable):
"""
Describes target repository and revision the builds run against
This serializable data container holding information about arrow's
git remote, branch, sha and version number as well as some metadata
(currently only an email address where the notification should be sent).
"""
def __init__(self, head, branch, remote, version, email=None):
self.head = head
self.email = email
self.branch = branch
self.remote = remote
self.version = version
self.no_rc_version = re.sub(r'-rc\d+\Z', '', version)
# Semantic Versioning 1.0.0: https://semver.org/spec/v1.0.0.html
#
# > A pre-release version number MAY be denoted by appending an
# > arbitrary string immediately following the patch version and a
# > dash. The string MUST be comprised of only alphanumerics plus
# > dash [0-9A-Za-z-].
#
# Example:
#
# '0.16.1.dev10' ->
# '0.16.1-dev10'
self.no_rc_semver_version = \
re.sub(r'\.(dev\d+)\Z', r'-\1', self.no_rc_version)
@classmethod
def from_repo(cls, repo, head=None, branch=None, remote=None, version=None,
email=None):
"""Initialize from a repository
Optionally override detected remote, branch, head, and/or version.
"""
assert isinstance(repo, Repo)
if head is None:
head = str(repo.head.target)
if branch is None:
branch = repo.branch.branch_name
if remote is None:
remote = repo.remote_url
if version is None:
version = get_version(repo.path)
if email is None:
email = repo.user_email
return cls(head=head, email=email, branch=branch, remote=remote,
version=version)
class Task(Serializable):
"""
Describes a build task and metadata required to render CI templates
A task is represented as a single git commit and branch containing jinja2
rendered files (currently appveyor.yml or .travis.yml configurations).
A task can't be directly submitted to a queue, must belong to a job.
Each task's unique identifier is its branch name, which is generated after
submitting the job to a queue.
"""
def __init__(self, ci, template, artifacts=None, params=None):
assert ci in {
'circle',
'travis',
'appveyor',
'azure',
'github',
'drone',
}
self.ci = ci
self.template = template
self.artifacts = artifacts or []
self.params = params or {}
self.branch = None # filled after adding to a queue
self.commit = None # filled after adding to a queue
self._queue = None # set by the queue object after put or get
self._status = None # status cache
self._assets = None # assets cache
def render_files(self, searchpath, params=None):
params = {**self.params, **(params or {}), "task": self}
try:
rendered = _render_jinja_template(searchpath, self.template,
params=params)
except jinja2.TemplateError as e:
raise RuntimeError(
'Failed to render template `{}` with {}: {}'.format(
self.template, e.__class__.__name__, str(e)
)
)
tree = {**_default_tree, self.filename: rendered}
return _unflatten_tree(tree)
@property
def tag(self):
return self.branch
@property
def filename(self):
config_files = {
'circle': '.circleci/config.yml',
'travis': '.travis.yml',
'appveyor': 'appveyor.yml',
'azure': 'azure-pipelines.yml',
'github': '.github/workflows/crossbow.yml',
'drone': '.drone.yml',
}
return config_files[self.ci]
def status(self, force_query=False):
_status = getattr(self, '_status', None)
if force_query or _status is None:
github_commit = self._queue.github_commit(self.commit)
self._status = TaskStatus(github_commit)
return self._status
def assets(self, force_query=False):
_assets = getattr(self, '_assets', None)
if force_query or _assets is None:
github_release = self._queue.github_release(self.tag)
self._assets = TaskAssets(github_release,
artifact_patterns=self.artifacts)
return self._assets
class TaskStatus:
"""
Combine the results from status and checks API to a single state.
Azure pipelines uses checks API which doesn't provide a combined
interface like status API does, so we need to manually combine
both the commit statuses and the commit checks coming from
different API endpoint
Status.state: error, failure, pending or success, default pending
CheckRun.status: queued, in_progress or completed, default: queued
CheckRun.conclusion: success, failure, neutral, cancelled, timed_out
or action_required, only set if
CheckRun.status == 'completed'
1. Convert CheckRun's status and conclusion to one of Status.state
2. Merge the states based on the following rules:
- failure if any of the contexts report as error or failure
- pending if there are no statuses or a context is pending
- success if the latest status for all contexts is success
error otherwise.
Parameters
----------
commit : github3.Commit
Commit to query the combined status for.
Returns
-------
TaskStatus(
combined_state='error|failure|pending|success',
github_status='original github status object',
github_check_runs='github checks associated with the commit',
total_count='number of statuses and checks'
)
"""
def __init__(self, commit):
status = commit.status()
check_runs = list(commit.check_runs())
states = [s.state for s in status.statuses]
for check in check_runs:
if check.status == 'completed':
if check.conclusion in {'success', 'failure'}:
states.append(check.conclusion)
elif check.conclusion in {'cancelled', 'timed_out',
'action_required'}:
states.append('error')
# omit `neutral` conclusion
else:
states.append('pending')
# it could be more effective, but the following is more descriptive
combined_state = 'error'
if len(states):
if any(state in {'error', 'failure'} for state in states):
combined_state = 'failure'
elif any(state == 'pending' for state in states):
combined_state = 'pending'
elif all(state == 'success' for state in states):
combined_state = 'success'
# show link to the actual build, some of the CI providers implement
# the statuses API others implement the checks API, so display both
build_links = [s.target_url for s in status.statuses]
build_links += [c.html_url for c in check_runs]
self.combined_state = combined_state
self.github_status = status
self.github_check_runs = check_runs
self.total_count = len(states)
self.build_links = build_links
class TaskAssets(dict):
def __init__(self, github_release, artifact_patterns):
# HACK(kszucs): don't expect uploaded assets of no atifacts were
# defiened for the tasks in order to spare a bit of github rate limit
if not artifact_patterns:
return
if github_release is None:
github_assets = {} # no assets have been uploaded for the task
else:
github_assets = {a.name: a for a in github_release.assets()}
for pattern in artifact_patterns:
# artifact can be a regex pattern
compiled = re.compile(pattern)
matches = list(
filter(None, map(compiled.match, github_assets.keys()))
)
num_matches = len(matches)
# validate artifact pattern matches single asset
if num_matches == 0:
self[pattern] = None
elif num_matches == 1:
self[pattern] = github_assets[matches[0].group(0)]
else:
raise CrossbowError(
'Only a single asset should match pattern `{}`, there are '
'multiple ones: {}'.format(pattern, ', '.join(matches))
)
def missing_patterns(self):
return [pattern for pattern, asset in self.items() if asset is None]
def uploaded_assets(self):
return [asset for asset in self.values() if asset is not None]
class Job(Serializable):
"""Describes multiple tasks against a single target repository"""
def __init__(self, target, tasks, params=None, template_searchpath=None):
if not tasks:
raise ValueError('no tasks were provided for the job')
if not all(isinstance(task, Task) for task in tasks.values()):
raise ValueError('each `tasks` mus be an instance of Task')
if not isinstance(target, Target):
raise ValueError('`target` must be an instance of Target')
if not isinstance(target, Target):
raise ValueError('`target` must be an instance of Target')
if not isinstance(params, dict):
raise ValueError('`params` must be an instance of dict')
self.target = target
self.tasks = tasks
self.params = params or {} # additional parameters for the tasks
self.branch = None # filled after adding to a queue
self._queue = None # set by the queue object after put or get
if template_searchpath is None:
self._template_searchpath = ArrowSources.find().path
else:
self._template_searchpath = template_searchpath
def render_files(self):
with StringIO() as buf:
yaml.dump(self, buf)
content = buf.getvalue()
tree = {**_default_tree, "job.yml": content}
return _unflatten_tree(tree)
def render_tasks(self, params=None):
result = {}
params = {
**self.params,
"arrow": self.target,
**(params or {})
}
for task_name, task in self.tasks.items():
files = task.render_files(self._template_searchpath, params)
result[task_name] = files
return result
@property
def template_searchpath(self):
return self._template_searchpath
@property
def queue(self):
assert isinstance(self._queue, Queue)
return self._queue
@queue.setter
def queue(self, queue):
assert isinstance(queue, Queue)
self._queue = queue
for task in self.tasks.values():
task._queue = queue
@property
def email(self):
return os.environ.get('CROSSBOW_EMAIL', self.target.email)
@property
def date(self):
return self.queue.date_of(self)
def show(self, stream=None):
return yaml.dump(self, stream=stream)
@classmethod
def from_config(cls, config, target, tasks=None, groups=None, params=None):
"""
Intantiate a job from based on a config.
Parameters
----------
config : dict
Deserialized content of tasks.yml
target : Target
Describes target repository and revision the builds run against.
tasks : Optional[List[str]], default None
List of glob patterns for matching task names.
groups : Optional[List[str]], default None
List of exact group names matching predefined task sets in the
config.
params : Optional[Dict[str, str]], default None
Additional rendering parameters for the task templates.
Returns
-------
Job
Raises
------
Exception:
If invalid groups or tasks has been passed.
"""
task_definitions = config.select(tasks, groups=groups)
# instantiate the tasks
tasks = {}
versions = {'version': target.version,
'no_rc_version': target.no_rc_version,
'no_rc_semver_version': target.no_rc_semver_version}
for task_name, task in task_definitions.items():
artifacts = task.pop('artifacts', None) or [] # because of yaml
artifacts = [fn.format(**versions) for fn in artifacts]
tasks[task_name] = Task(artifacts=artifacts, **task)
return cls(target=target, tasks=tasks, params=params,
template_searchpath=config.template_searchpath)
def is_finished(self):
for task in self.tasks.values():
status = task.status(force_query=True)
if status.combined_state == 'pending':
return False
return True
def wait_until_finished(self, poll_max_minutes=120,
poll_interval_minutes=10):
started_at = time.time()
while True:
if self.is_finished():
break
waited_for_minutes = (time.time() - started_at) / 60
if waited_for_minutes > poll_max_minutes:
msg = ('Exceeded the maximum amount of time waiting for job '
'to finish, waited for {} minutes.')
raise RuntimeError(msg.format(waited_for_minutes))
logger.info('Waiting {} minutes and then checking again'
.format(poll_interval_minutes))
time.sleep(poll_interval_minutes * 60)
class Config(dict):
def __init__(self, tasks, template_searchpath):
super().__init__(tasks)
self.template_searchpath = template_searchpath
@classmethod
def load_yaml(cls, path):
path = Path(path)
searchpath = path.parent
rendered = _render_jinja_template(searchpath, template=path.name,
params={})
config = yaml.load(rendered)
return cls(config, template_searchpath=searchpath)
def show(self, stream=None):
return yaml.dump(dict(self), stream=stream)
def select(self, tasks=None, groups=None):
config_groups = dict(self['groups'])
config_tasks = dict(self['tasks'])
valid_groups = set(config_groups.keys())
valid_tasks = set(config_tasks.keys())
group_whitelist = list(groups or [])
task_whitelist = list(tasks or [])
# validate that the passed groups are defined in the config
requested_groups = set(group_whitelist)
invalid_groups = requested_groups - valid_groups
if invalid_groups:
msg = 'Invalid group(s) {!r}. Must be one of {!r}'.format(
invalid_groups, valid_groups
)
raise CrossbowError(msg)
# merge the tasks defined in the selected groups
task_patterns = [list(config_groups[name]) for name in group_whitelist]
task_patterns = set(sum(task_patterns, task_whitelist))
# treat the task names as glob patterns to select tasks more easily
requested_tasks = set()
for pattern in task_patterns:
matches = fnmatch.filter(valid_tasks, pattern)
if len(matches):
requested_tasks.update(matches)
else:
raise CrossbowError(
"Unable to match any tasks for `{}`".format(pattern)
)
# validate that the passed and matched tasks are defined in the config
invalid_tasks = requested_tasks - valid_tasks
if invalid_tasks:
msg = 'Invalid task(s) {!r}. Must be one of {!r}'.format(
invalid_tasks, valid_tasks
)
raise CrossbowError(msg)
return {
task_name: config_tasks[task_name] for task_name in requested_tasks
}
def validate(self):
# validate that the task groups are properly referening the tasks
for group_name, group in self['groups'].items():
for pattern in group:
tasks = self.select(tasks=[pattern])
if not tasks:
raise CrossbowError(
"The pattern `{}` defined for task group `{}` is not "
"matching any of the tasks defined in the "
"configuration file.".format(pattern, group_name)
)
# validate that the tasks are constructible
for task_name, task in self['tasks'].items():
try:
Task(**task)
except Exception as e:
raise CrossbowError(
'Unable to construct a task object from the '
'definition of task `{}`. The original error message '
'is: `{}`'.format(task_name, str(e))
)
# validate that the defined tasks are renderable, in order to to that
# define the required object with dummy data
target = Target(
head='e279a7e06e61c14868ca7d71dea795420aea6539',
branch='master',
remote='https://github.com/apache/arrow',
version='1.0.0dev123',
email='dummy@example.ltd'
)
for task_name, task in self['tasks'].items():
task = Task(**task)
files = task.render_files(
self.template_searchpath,
params=dict(
arrow=target,
queue_remote_url='https://github.com/org/crossbow'
)
)
if not files:
raise CrossbowError('No files have been rendered for task `{}`'
.format(task_name))
# configure yaml serializer
yaml = YAML()
yaml.register_class(Job)
yaml.register_class(Task)
yaml.register_class(Target)