| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os |
| import re |
| import fnmatch |
| import glob |
| import time |
| import logging |
| import mimetypes |
| import subprocess |
| import textwrap |
| from io import StringIO |
| from pathlib import Path |
| from datetime import date |
| |
| import jinja2 |
| from ruamel.yaml import YAML |
| |
| try: |
| import github3 |
| _have_github3 = True |
| except ImportError: |
| github3 = object |
| _have_github3 = False |
| |
| try: |
| import pygit2 |
| except ImportError: |
| PygitRemoteCallbacks = object |
| else: |
| PygitRemoteCallbacks = pygit2.RemoteCallbacks |
| |
| from ..utils.source import ArrowSources |
| |
| |
| for pkg in ["requests", "urllib3", "github3"]: |
| logging.getLogger(pkg).setLevel(logging.WARNING) |
| |
| logger = logging.getLogger("crossbow") |
| |
| |
| class CrossbowError(Exception): |
| pass |
| |
| |
| def _flatten(mapping): |
| """Converts a hierarchical mapping to a flat dictionary""" |
| result = {} |
| for k, v in mapping.items(): |
| if isinstance(v, dict): |
| for ik, iv in _flatten(v).items(): |
| ik = ik if isinstance(ik, tuple) else (ik,) |
| result[(k,) + ik] = iv |
| elif isinstance(v, list): |
| for ik, iv in enumerate(_flatten(v)): |
| ik = ik if isinstance(ik, tuple) else (ik,) |
| result[(k,) + ik] = iv |
| else: |
| result[(k,)] = v |
| return result |
| |
| |
| def _unflatten(mapping): |
| """Converts a flat tuple => object mapping to hierarchical one""" |
| result = {} |
| for path, value in mapping.items(): |
| parents, leaf = path[:-1], path[-1] |
| # create the hierarchy until we reach the leaf value |
| temp = result |
| for parent in parents: |
| temp.setdefault(parent, {}) |
| temp = temp[parent] |
| # set the leaf value |
| temp[leaf] = value |
| |
| return result |
| |
| |
| def _unflatten_tree(files): |
| """Converts a flat path => object mapping to a hierarchical directories |
| |
| Input: |
| { |
| 'path/to/file.a': a_content, |
| 'path/to/file.b': b_content, |
| 'path/file.c': c_content |
| } |
| Output: |
| { |
| 'path': { |
| 'to': { |
| 'file.a': a_content, |
| 'file.b': b_content |
| }, |
| 'file.c': c_content |
| } |
| } |
| """ |
| files = {tuple(k.split('/')): v for k, v in files.items()} |
| return _unflatten(files) |
| |
| |
| def _render_jinja_template(searchpath, template, params): |
| def format_all(items, pattern): |
| return [pattern.format(item) for item in items] |
| |
| loader = jinja2.FileSystemLoader(searchpath) |
| env = jinja2.Environment(loader=loader, trim_blocks=True, |
| lstrip_blocks=True, |
| undefined=jinja2.StrictUndefined) |
| env.filters['format_all'] = format_all |
| template = env.get_template(template) |
| return template.render(**params) |
| |
| |
| # configurations for setting up branch skipping |
| # - appveyor has a feature to skip builds without an appveyor.yml |
| # - travis reads from the master branch and applies the rules |
| # - circle requires the configuration to be present on all branch, even ones |
| # that are configured to be skipped |
| # - azure skips branches without azure-pipelines.yml by default |
| # - github skips branches without .github/workflows/ by default |
| |
| _default_travis_yml = """ |
| branches: |
| only: |
| - master |
| - /.*-travis-.*/ |
| |
| os: linux |
| dist: trusty |
| language: generic |
| """ |
| |
| _default_circle_yml = """ |
| version: 2 |
| |
| jobs: |
| build: |
| machine: true |
| |
| workflows: |
| version: 2 |
| build: |
| jobs: |
| - build: |
| filters: |
| branches: |
| only: |
| - /.*-circle-.*/ |
| """ |
| |
| _default_tree = { |
| '.travis.yml': _default_travis_yml, |
| '.circleci/config.yml': _default_circle_yml |
| } |
| |
| |
| class GitRemoteCallbacks(PygitRemoteCallbacks): |
| |
| def __init__(self, token): |
| self.token = token |
| self.attempts = 0 |
| super().__init__() |
| |
| def push_update_reference(self, refname, message): |
| pass |
| |
| def update_tips(self, refname, old, new): |
| pass |
| |
| def credentials(self, url, username_from_url, allowed_types): |
| # its a libgit2 bug, that it infinitely retries the authentication |
| self.attempts += 1 |
| |
| if self.attempts >= 5: |
| # pygit2 doesn't propagate the exception properly |
| msg = 'Wrong oauth personal access token' |
| print(msg) |
| raise CrossbowError(msg) |
| |
| if allowed_types & pygit2.credentials.GIT_CREDTYPE_USERPASS_PLAINTEXT: |
| return pygit2.UserPass(self.token, 'x-oauth-basic') |
| else: |
| return None |
| |
| |
| def _git_ssh_to_https(url): |
| return url.replace('git@github.com:', 'https://github.com/') |
| |
| |
| class Repo: |
| """ |
| Base class for interaction with local git repositories |
| |
| A high level wrapper used for both reading revision information from |
| arrow's repository and pushing continuous integration tasks to the queue |
| repository. |
| |
| Parameters |
| ---------- |
| require_https : boolean, default False |
| Raise exception for SSH origin URLs |
| """ |
| |
| def __init__(self, path, github_token=None, remote_url=None, |
| require_https=False): |
| self.path = Path(path) |
| self.github_token = github_token |
| self.require_https = require_https |
| self._remote_url = remote_url |
| self._pygit_repo = None |
| self._github_repo = None # set by as_github_repo() |
| self._updated_refs = [] |
| |
| def __str__(self): |
| tpl = textwrap.dedent(''' |
| Repo: {remote}@{branch} |
| Commit: {head} |
| ''') |
| return tpl.format( |
| remote=self.remote_url, |
| branch=self.branch.branch_name, |
| head=self.head |
| ) |
| |
| @property |
| def repo(self): |
| if self._pygit_repo is None: |
| self._pygit_repo = pygit2.Repository(str(self.path)) |
| return self._pygit_repo |
| |
| @property |
| def origin(self): |
| remote = self.repo.remotes['origin'] |
| if self.require_https and remote.url.startswith('git@github.com'): |
| raise CrossbowError("Change SSH origin URL to HTTPS to use " |
| "Crossbow: {}".format(remote.url)) |
| return remote |
| |
| def fetch(self): |
| refspec = '+refs/heads/*:refs/remotes/origin/*' |
| self.origin.fetch([refspec]) |
| |
| def push(self, refs=None, github_token=None): |
| github_token = github_token or self.github_token |
| if github_token is None: |
| raise RuntimeError( |
| 'Could not determine GitHub token. Please set the ' |
| 'CROSSBOW_GITHUB_TOKEN environment variable to a ' |
| 'valid GitHub access token or pass one to --github-token.' |
| ) |
| callbacks = GitRemoteCallbacks(github_token) |
| refs = refs or [] |
| try: |
| self.origin.push(refs + self._updated_refs, callbacks=callbacks) |
| except pygit2.GitError: |
| raise RuntimeError('Failed to push updated references, ' |
| 'potentially because of credential issues: {}' |
| .format(self._updated_refs)) |
| else: |
| self.updated_refs = [] |
| |
| @property |
| def head(self): |
| """Currently checked out commit's sha""" |
| return self.repo.head |
| |
| @property |
| def branch(self): |
| """Currently checked out branch""" |
| try: |
| return self.repo.branches[self.repo.head.shorthand] |
| except KeyError: |
| return None # detached |
| |
| @property |
| def remote(self): |
| """Currently checked out branch's remote counterpart""" |
| try: |
| return self.repo.remotes[self.branch.upstream.remote_name] |
| except (AttributeError, KeyError): |
| return None # cannot detect |
| |
| @property |
| def remote_url(self): |
| """Currently checked out branch's remote counterpart URL |
| |
| If an SSH github url is set, it will be replaced by the https |
| equivalent usable with GitHub OAuth token. |
| """ |
| try: |
| return self._remote_url or _git_ssh_to_https(self.remote.url) |
| except AttributeError: |
| return None |
| |
| @property |
| def user_name(self): |
| try: |
| return next(self.repo.config.get_multivar('user.name')) |
| except StopIteration: |
| return os.environ.get('GIT_COMMITTER_NAME', 'unknown') |
| |
| @property |
| def user_email(self): |
| try: |
| return next(self.repo.config.get_multivar('user.email')) |
| except StopIteration: |
| return os.environ.get('GIT_COMMITTER_EMAIL', 'unknown') |
| |
| @property |
| def signature(self): |
| return pygit2.Signature(self.user_name, self.user_email, |
| int(time.time())) |
| |
| def create_tree(self, files): |
| builder = self.repo.TreeBuilder() |
| |
| for filename, content in files.items(): |
| if isinstance(content, dict): |
| # create a subtree |
| tree_id = self.create_tree(content) |
| builder.insert(filename, tree_id, pygit2.GIT_FILEMODE_TREE) |
| else: |
| # create a file |
| blob_id = self.repo.create_blob(content) |
| builder.insert(filename, blob_id, pygit2.GIT_FILEMODE_BLOB) |
| |
| tree_id = builder.write() |
| return tree_id |
| |
| def create_commit(self, files, parents=None, message='', |
| reference_name=None): |
| if parents is None: |
| # by default use the main branch as the base of the new branch |
| # required to reuse github actions cache across crossbow tasks |
| commit, _ = self.repo.resolve_refish("master") |
| parents = [commit.id] |
| tree_id = self.create_tree(files) |
| |
| author = committer = self.signature |
| commit_id = self.repo.create_commit(reference_name, author, committer, |
| message, tree_id, parents) |
| return self.repo[commit_id] |
| |
| def create_branch(self, branch_name, files, parents=None, message='', |
| signature=None): |
| # create commit with the passed tree |
| commit = self.create_commit(files, parents=parents, message=message) |
| |
| # create branch pointing to the previously created commit |
| branch = self.repo.create_branch(branch_name, commit) |
| |
| # append to the pushable references |
| self._updated_refs.append('refs/heads/{}'.format(branch_name)) |
| |
| return branch |
| |
| def create_tag(self, tag_name, commit_id, message=''): |
| tag_id = self.repo.create_tag(tag_name, commit_id, |
| pygit2.GIT_OBJ_COMMIT, self.signature, |
| message) |
| |
| # append to the pushable references |
| self._updated_refs.append('refs/tags/{}'.format(tag_name)) |
| |
| return self.repo[tag_id] |
| |
| def file_contents(self, commit_id, file): |
| commit = self.repo[commit_id] |
| entry = commit.tree[file] |
| blob = self.repo[entry.id] |
| return blob.data |
| |
| def _parse_github_user_repo(self): |
| m = re.match(r'.*\/([^\/]+)\/([^\/\.]+)(\.git)?$', self.remote_url) |
| if m is None: |
| raise CrossbowError( |
| "Unable to parse the github owner and repository from the " |
| "repository's remote url '{}'".format(self.remote_url) |
| ) |
| user, repo = m.group(1), m.group(2) |
| return user, repo |
| |
| def as_github_repo(self, github_token=None): |
| """Converts it to a repository object which wraps the GitHub API""" |
| if self._github_repo is None: |
| if not _have_github3: |
| raise ImportError('Must install github3.py') |
| github_token = github_token or self.github_token |
| username, reponame = self._parse_github_user_repo() |
| session = github3.session.GitHubSession( |
| default_connect_timeout=10, |
| default_read_timeout=30 |
| ) |
| github = github3.GitHub(session=session) |
| github.login(token=github_token) |
| self._github_repo = github.repository(username, reponame) |
| return self._github_repo |
| |
| def github_commit(self, sha): |
| repo = self.as_github_repo() |
| return repo.commit(sha) |
| |
| def github_release(self, tag): |
| repo = self.as_github_repo() |
| try: |
| return repo.release_from_tag(tag) |
| except github3.exceptions.NotFoundError: |
| return None |
| |
| def github_upload_asset_requests(self, release, path, name, mime, |
| max_retries=None, retry_backoff=None): |
| if max_retries is None: |
| max_retries = int(os.environ.get('CROSSBOW_MAX_RETRIES', 8)) |
| if retry_backoff is None: |
| retry_backoff = int(os.environ.get('CROSSBOW_RETRY_BACKOFF', 5)) |
| |
| for i in range(max_retries): |
| try: |
| with open(path, 'rb') as fp: |
| result = release.upload_asset(name=name, asset=fp, |
| content_type=mime) |
| except github3.exceptions.ResponseError as e: |
| logger.error('Attempt {} has failed with message: {}.' |
| .format(i + 1, str(e))) |
| logger.error('Error message {}'.format(e.msg)) |
| logger.error('List of errors provided by Github:') |
| for err in e.errors: |
| logger.error(' - {}'.format(err)) |
| |
| if e.code == 422: |
| # 422 Validation Failed, probably raised because |
| # ReleaseAsset already exists, so try to remove it before |
| # reattempting the asset upload |
| for asset in release.assets(): |
| if asset.name == name: |
| logger.info('Release asset {} already exists, ' |
| 'removing it...'.format(name)) |
| asset.delete() |
| logger.info('Asset {} removed.'.format(name)) |
| break |
| except github3.exceptions.ConnectionError as e: |
| logger.error('Attempt {} has failed with message: {}.' |
| .format(i + 1, str(e))) |
| else: |
| logger.info('Attempt {} has finished.'.format(i + 1)) |
| return result |
| |
| time.sleep(retry_backoff) |
| |
| raise RuntimeError('Github asset uploading has failed!') |
| |
| def github_upload_asset_curl(self, release, path, name, mime): |
| upload_url, _ = release.upload_url.split('{?') |
| upload_url += '?name={}'.format(name) |
| |
| command = [ |
| 'curl', |
| '--fail', |
| '-H', "Authorization: token {}".format(self.github_token), |
| '-H', "Content-Type: {}".format(mime), |
| '--data-binary', '@{}'.format(path), |
| upload_url |
| ] |
| return subprocess.run(command, shell=False, check=True) |
| |
| def github_overwrite_release_assets(self, tag_name, target_commitish, |
| patterns, method='requests'): |
| # Since github has changed something the asset uploading via requests |
| # got instable, so prefer the cURL alternative. |
| # Potential cause: |
| # sigmavirus24/github3.py/issues/779#issuecomment-379470626 |
| repo = self.as_github_repo() |
| if not tag_name: |
| raise CrossbowError('Empty tag name') |
| if not target_commitish: |
| raise CrossbowError('Empty target commit for the release tag') |
| |
| # remove the whole release if it already exists |
| try: |
| release = repo.release_from_tag(tag_name) |
| except github3.exceptions.NotFoundError: |
| pass |
| else: |
| release.delete() |
| |
| release = repo.create_release(tag_name, target_commitish) |
| for pattern in patterns: |
| for path in glob.glob(pattern, recursive=True): |
| name = os.path.basename(path) |
| size = os.path.getsize(path) |
| mime = mimetypes.guess_type(name)[0] or 'application/zip' |
| |
| logger.info( |
| 'Uploading asset `{}` with mimetype {} and size {}...' |
| .format(name, mime, size) |
| ) |
| |
| if method == 'requests': |
| self.github_upload_asset_requests(release, path, name=name, |
| mime=mime) |
| elif method == 'curl': |
| self.github_upload_asset_curl(release, path, name=name, |
| mime=mime) |
| else: |
| raise CrossbowError( |
| 'Unsupported upload method {}'.format(method) |
| ) |
| |
| |
| class Queue(Repo): |
| |
| def _latest_prefix_id(self, prefix): |
| pattern = re.compile(r'[\w\/-]*{}-(\d+)'.format(prefix)) |
| matches = list(filter(None, map(pattern.match, self.repo.branches))) |
| if matches: |
| latest = max(int(m.group(1)) for m in matches) |
| else: |
| latest = -1 |
| return latest |
| |
| def _next_job_id(self, prefix): |
| """Auto increments the branch's identifier based on the prefix""" |
| latest_id = self._latest_prefix_id(prefix) |
| return '{}-{}'.format(prefix, latest_id + 1) |
| |
| def latest_for_prefix(self, prefix): |
| latest_id = self._latest_prefix_id(prefix) |
| if latest_id < 0: |
| raise RuntimeError( |
| 'No job has been submitted with prefix {} yet'.format(prefix) |
| ) |
| job_name = '{}-{}'.format(prefix, latest_id) |
| return self.get(job_name) |
| |
| def date_of(self, job): |
| # it'd be better to bound to the queue repository on deserialization |
| # and reorganize these methods to Job |
| branch_name = 'origin/{}'.format(job.branch) |
| branch = self.repo.branches[branch_name] |
| commit = self.repo[branch.target] |
| return date.fromtimestamp(commit.commit_time) |
| |
| def jobs(self, pattern): |
| """Return jobs sorted by its identifier in reverse order""" |
| job_names = [] |
| for name in self.repo.branches.remote: |
| origin, name = name.split('/', 1) |
| result = re.match(pattern, name) |
| if result: |
| job_names.append(name) |
| |
| for name in sorted(job_names, reverse=True): |
| yield self.get(name) |
| |
| def get(self, job_name): |
| branch_name = 'origin/{}'.format(job_name) |
| branch = self.repo.branches[branch_name] |
| try: |
| content = self.file_contents(branch.target, 'job.yml') |
| except KeyError: |
| raise CrossbowError( |
| 'No job is found with name: {}'.format(job_name) |
| ) |
| |
| buffer = StringIO(content.decode('utf-8')) |
| job = yaml.load(buffer) |
| job.queue = self |
| return job |
| |
| def put(self, job, prefix='build'): |
| if not isinstance(job, Job): |
| raise CrossbowError('`job` must be an instance of Job') |
| if job.branch is not None: |
| raise CrossbowError('`job.branch` is automatically generated, ' |
| 'thus it must be blank') |
| |
| if job.target.remote is None: |
| raise CrossbowError( |
| 'Cannot determine git remote for the Arrow repository to ' |
| 'clone or push to, try to push the `{}` branch first to have ' |
| 'a remote tracking counterpart.'.format(job.target.branch) |
| ) |
| if job.target.branch is None: |
| raise CrossbowError( |
| 'Cannot determine the current branch of the Arrow repository ' |
| 'to clone or push to, perhaps it is in detached HEAD state. ' |
| 'Please checkout a branch.' |
| ) |
| |
| # auto increment and set next job id, e.g. build-85 |
| job._queue = self |
| job.branch = self._next_job_id(prefix) |
| |
| # create tasks' branches |
| for task_name, task in job.tasks.items(): |
| # adding CI's name to the end of the branch in order to use skip |
| # patterns on travis and circleci |
| task.branch = '{}-{}-{}'.format(job.branch, task.ci, task_name) |
| params = { |
| **job.params, |
| "arrow": job.target, |
| "queue_remote_url": self.remote_url |
| } |
| files = task.render_files(job.template_searchpath, params=params) |
| branch = self.create_branch(task.branch, files=files) |
| self.create_tag(task.tag, branch.target) |
| task.commit = str(branch.target) |
| |
| # create job's branch with its description |
| return self.create_branch(job.branch, files=job.render_files()) |
| |
| |
| def get_version(root, **kwargs): |
| """ |
| Parse function for setuptools_scm that ignores tags for non-C++ |
| subprojects, e.g. apache-arrow-js-XXX tags. |
| """ |
| from setuptools_scm.git import parse as parse_git_version |
| |
| # query the calculated version based on the git tags |
| kwargs['describe_command'] = ( |
| 'git describe --dirty --tags --long --match "apache-arrow-[0-9].*"' |
| ) |
| version = parse_git_version(root, **kwargs) |
| |
| # increment the minor version, because there can be patch releases created |
| # from maintenance branches where the tags are unreachable from the |
| # master's HEAD, so the git command above generates 0.17.0.dev300 even if |
| # arrow has a never 0.17.1 patch release |
| pattern = r"^(\d+)\.(\d+)\.(\d+)$" |
| match = re.match(pattern, str(version.tag)) |
| major, minor, patch = map(int, match.groups()) |
| |
| # the bumped version number after 0.17.x will be 0.18.0.dev300 |
| return "{}.{}.{}.dev{}".format(major, minor + 1, patch, version.distance) |
| |
| |
| class Serializable: |
| |
| @classmethod |
| def to_yaml(cls, representer, data): |
| tag = '!{}'.format(cls.__name__) |
| dct = {k: v for k, v in data.__dict__.items() if not k.startswith('_')} |
| return representer.represent_mapping(tag, dct) |
| |
| |
| class Target(Serializable): |
| """ |
| Describes target repository and revision the builds run against |
| |
| This serializable data container holding information about arrow's |
| git remote, branch, sha and version number as well as some metadata |
| (currently only an email address where the notification should be sent). |
| """ |
| |
| def __init__(self, head, branch, remote, version, email=None): |
| self.head = head |
| self.email = email |
| self.branch = branch |
| self.remote = remote |
| self.version = version |
| self.no_rc_version = re.sub(r'-rc\d+\Z', '', version) |
| # Semantic Versioning 1.0.0: https://semver.org/spec/v1.0.0.html |
| # |
| # > A pre-release version number MAY be denoted by appending an |
| # > arbitrary string immediately following the patch version and a |
| # > dash. The string MUST be comprised of only alphanumerics plus |
| # > dash [0-9A-Za-z-]. |
| # |
| # Example: |
| # |
| # '0.16.1.dev10' -> |
| # '0.16.1-dev10' |
| self.no_rc_semver_version = \ |
| re.sub(r'\.(dev\d+)\Z', r'-\1', self.no_rc_version) |
| |
| @classmethod |
| def from_repo(cls, repo, head=None, branch=None, remote=None, version=None, |
| email=None): |
| """Initialize from a repository |
| |
| Optionally override detected remote, branch, head, and/or version. |
| """ |
| assert isinstance(repo, Repo) |
| |
| if head is None: |
| head = str(repo.head.target) |
| if branch is None: |
| branch = repo.branch.branch_name |
| if remote is None: |
| remote = repo.remote_url |
| if version is None: |
| version = get_version(repo.path) |
| if email is None: |
| email = repo.user_email |
| |
| return cls(head=head, email=email, branch=branch, remote=remote, |
| version=version) |
| |
| |
| class Task(Serializable): |
| """ |
| Describes a build task and metadata required to render CI templates |
| |
| A task is represented as a single git commit and branch containing jinja2 |
| rendered files (currently appveyor.yml or .travis.yml configurations). |
| |
| A task can't be directly submitted to a queue, must belong to a job. |
| Each task's unique identifier is its branch name, which is generated after |
| submitting the job to a queue. |
| """ |
| |
| def __init__(self, ci, template, artifacts=None, params=None): |
| assert ci in { |
| 'circle', |
| 'travis', |
| 'appveyor', |
| 'azure', |
| 'github', |
| 'drone', |
| } |
| self.ci = ci |
| self.template = template |
| self.artifacts = artifacts or [] |
| self.params = params or {} |
| self.branch = None # filled after adding to a queue |
| self.commit = None # filled after adding to a queue |
| self._queue = None # set by the queue object after put or get |
| self._status = None # status cache |
| self._assets = None # assets cache |
| |
| def render_files(self, searchpath, params=None): |
| params = {**self.params, **(params or {}), "task": self} |
| try: |
| rendered = _render_jinja_template(searchpath, self.template, |
| params=params) |
| except jinja2.TemplateError as e: |
| raise RuntimeError( |
| 'Failed to render template `{}` with {}: {}'.format( |
| self.template, e.__class__.__name__, str(e) |
| ) |
| ) |
| |
| tree = {**_default_tree, self.filename: rendered} |
| return _unflatten_tree(tree) |
| |
| @property |
| def tag(self): |
| return self.branch |
| |
| @property |
| def filename(self): |
| config_files = { |
| 'circle': '.circleci/config.yml', |
| 'travis': '.travis.yml', |
| 'appveyor': 'appveyor.yml', |
| 'azure': 'azure-pipelines.yml', |
| 'github': '.github/workflows/crossbow.yml', |
| 'drone': '.drone.yml', |
| } |
| return config_files[self.ci] |
| |
| def status(self, force_query=False): |
| _status = getattr(self, '_status', None) |
| if force_query or _status is None: |
| github_commit = self._queue.github_commit(self.commit) |
| self._status = TaskStatus(github_commit) |
| return self._status |
| |
| def assets(self, force_query=False): |
| _assets = getattr(self, '_assets', None) |
| if force_query or _assets is None: |
| github_release = self._queue.github_release(self.tag) |
| self._assets = TaskAssets(github_release, |
| artifact_patterns=self.artifacts) |
| return self._assets |
| |
| |
| class TaskStatus: |
| """ |
| Combine the results from status and checks API to a single state. |
| |
| Azure pipelines uses checks API which doesn't provide a combined |
| interface like status API does, so we need to manually combine |
| both the commit statuses and the commit checks coming from |
| different API endpoint |
| |
| Status.state: error, failure, pending or success, default pending |
| CheckRun.status: queued, in_progress or completed, default: queued |
| CheckRun.conclusion: success, failure, neutral, cancelled, timed_out |
| or action_required, only set if |
| CheckRun.status == 'completed' |
| |
| 1. Convert CheckRun's status and conclusion to one of Status.state |
| 2. Merge the states based on the following rules: |
| - failure if any of the contexts report as error or failure |
| - pending if there are no statuses or a context is pending |
| - success if the latest status for all contexts is success |
| error otherwise. |
| |
| Parameters |
| ---------- |
| commit : github3.Commit |
| Commit to query the combined status for. |
| |
| Returns |
| ------- |
| TaskStatus( |
| combined_state='error|failure|pending|success', |
| github_status='original github status object', |
| github_check_runs='github checks associated with the commit', |
| total_count='number of statuses and checks' |
| ) |
| """ |
| |
| def __init__(self, commit): |
| status = commit.status() |
| check_runs = list(commit.check_runs()) |
| states = [s.state for s in status.statuses] |
| |
| for check in check_runs: |
| if check.status == 'completed': |
| if check.conclusion in {'success', 'failure'}: |
| states.append(check.conclusion) |
| elif check.conclusion in {'cancelled', 'timed_out', |
| 'action_required'}: |
| states.append('error') |
| # omit `neutral` conclusion |
| else: |
| states.append('pending') |
| |
| # it could be more effective, but the following is more descriptive |
| combined_state = 'error' |
| if len(states): |
| if any(state in {'error', 'failure'} for state in states): |
| combined_state = 'failure' |
| elif any(state == 'pending' for state in states): |
| combined_state = 'pending' |
| elif all(state == 'success' for state in states): |
| combined_state = 'success' |
| |
| # show link to the actual build, some of the CI providers implement |
| # the statuses API others implement the checks API, so display both |
| build_links = [s.target_url for s in status.statuses] |
| build_links += [c.html_url for c in check_runs] |
| |
| self.combined_state = combined_state |
| self.github_status = status |
| self.github_check_runs = check_runs |
| self.total_count = len(states) |
| self.build_links = build_links |
| |
| |
| class TaskAssets(dict): |
| |
| def __init__(self, github_release, artifact_patterns): |
| # HACK(kszucs): don't expect uploaded assets of no atifacts were |
| # defiened for the tasks in order to spare a bit of github rate limit |
| if not artifact_patterns: |
| return |
| |
| if github_release is None: |
| github_assets = {} # no assets have been uploaded for the task |
| else: |
| github_assets = {a.name: a for a in github_release.assets()} |
| |
| for pattern in artifact_patterns: |
| # artifact can be a regex pattern |
| compiled = re.compile(pattern) |
| matches = list( |
| filter(None, map(compiled.match, github_assets.keys())) |
| ) |
| num_matches = len(matches) |
| |
| # validate artifact pattern matches single asset |
| if num_matches == 0: |
| self[pattern] = None |
| elif num_matches == 1: |
| self[pattern] = github_assets[matches[0].group(0)] |
| else: |
| raise CrossbowError( |
| 'Only a single asset should match pattern `{}`, there are ' |
| 'multiple ones: {}'.format(pattern, ', '.join(matches)) |
| ) |
| |
| def missing_patterns(self): |
| return [pattern for pattern, asset in self.items() if asset is None] |
| |
| def uploaded_assets(self): |
| return [asset for asset in self.values() if asset is not None] |
| |
| |
| class Job(Serializable): |
| """Describes multiple tasks against a single target repository""" |
| |
| def __init__(self, target, tasks, params=None, template_searchpath=None): |
| if not tasks: |
| raise ValueError('no tasks were provided for the job') |
| if not all(isinstance(task, Task) for task in tasks.values()): |
| raise ValueError('each `tasks` mus be an instance of Task') |
| if not isinstance(target, Target): |
| raise ValueError('`target` must be an instance of Target') |
| if not isinstance(target, Target): |
| raise ValueError('`target` must be an instance of Target') |
| if not isinstance(params, dict): |
| raise ValueError('`params` must be an instance of dict') |
| |
| self.target = target |
| self.tasks = tasks |
| self.params = params or {} # additional parameters for the tasks |
| self.branch = None # filled after adding to a queue |
| self._queue = None # set by the queue object after put or get |
| if template_searchpath is None: |
| self._template_searchpath = ArrowSources.find().path |
| else: |
| self._template_searchpath = template_searchpath |
| |
| def render_files(self): |
| with StringIO() as buf: |
| yaml.dump(self, buf) |
| content = buf.getvalue() |
| tree = {**_default_tree, "job.yml": content} |
| return _unflatten_tree(tree) |
| |
| def render_tasks(self, params=None): |
| result = {} |
| params = { |
| **self.params, |
| "arrow": self.target, |
| **(params or {}) |
| } |
| for task_name, task in self.tasks.items(): |
| files = task.render_files(self._template_searchpath, params) |
| result[task_name] = files |
| return result |
| |
| @property |
| def template_searchpath(self): |
| return self._template_searchpath |
| |
| @property |
| def queue(self): |
| assert isinstance(self._queue, Queue) |
| return self._queue |
| |
| @queue.setter |
| def queue(self, queue): |
| assert isinstance(queue, Queue) |
| self._queue = queue |
| for task in self.tasks.values(): |
| task._queue = queue |
| |
| @property |
| def email(self): |
| return os.environ.get('CROSSBOW_EMAIL', self.target.email) |
| |
| @property |
| def date(self): |
| return self.queue.date_of(self) |
| |
| def show(self, stream=None): |
| return yaml.dump(self, stream=stream) |
| |
| @classmethod |
| def from_config(cls, config, target, tasks=None, groups=None, params=None): |
| """ |
| Intantiate a job from based on a config. |
| |
| Parameters |
| ---------- |
| config : dict |
| Deserialized content of tasks.yml |
| target : Target |
| Describes target repository and revision the builds run against. |
| tasks : Optional[List[str]], default None |
| List of glob patterns for matching task names. |
| groups : Optional[List[str]], default None |
| List of exact group names matching predefined task sets in the |
| config. |
| params : Optional[Dict[str, str]], default None |
| Additional rendering parameters for the task templates. |
| |
| Returns |
| ------- |
| Job |
| |
| Raises |
| ------ |
| Exception: |
| If invalid groups or tasks has been passed. |
| """ |
| task_definitions = config.select(tasks, groups=groups) |
| |
| # instantiate the tasks |
| tasks = {} |
| versions = {'version': target.version, |
| 'no_rc_version': target.no_rc_version, |
| 'no_rc_semver_version': target.no_rc_semver_version} |
| for task_name, task in task_definitions.items(): |
| artifacts = task.pop('artifacts', None) or [] # because of yaml |
| artifacts = [fn.format(**versions) for fn in artifacts] |
| tasks[task_name] = Task(artifacts=artifacts, **task) |
| |
| return cls(target=target, tasks=tasks, params=params, |
| template_searchpath=config.template_searchpath) |
| |
| def is_finished(self): |
| for task in self.tasks.values(): |
| status = task.status(force_query=True) |
| if status.combined_state == 'pending': |
| return False |
| return True |
| |
| def wait_until_finished(self, poll_max_minutes=120, |
| poll_interval_minutes=10): |
| started_at = time.time() |
| while True: |
| if self.is_finished(): |
| break |
| |
| waited_for_minutes = (time.time() - started_at) / 60 |
| if waited_for_minutes > poll_max_minutes: |
| msg = ('Exceeded the maximum amount of time waiting for job ' |
| 'to finish, waited for {} minutes.') |
| raise RuntimeError(msg.format(waited_for_minutes)) |
| |
| logger.info('Waiting {} minutes and then checking again' |
| .format(poll_interval_minutes)) |
| time.sleep(poll_interval_minutes * 60) |
| |
| |
| class Config(dict): |
| |
| def __init__(self, tasks, template_searchpath): |
| super().__init__(tasks) |
| self.template_searchpath = template_searchpath |
| |
| @classmethod |
| def load_yaml(cls, path): |
| path = Path(path) |
| searchpath = path.parent |
| rendered = _render_jinja_template(searchpath, template=path.name, |
| params={}) |
| config = yaml.load(rendered) |
| return cls(config, template_searchpath=searchpath) |
| |
| def show(self, stream=None): |
| return yaml.dump(dict(self), stream=stream) |
| |
| def select(self, tasks=None, groups=None): |
| config_groups = dict(self['groups']) |
| config_tasks = dict(self['tasks']) |
| valid_groups = set(config_groups.keys()) |
| valid_tasks = set(config_tasks.keys()) |
| group_whitelist = list(groups or []) |
| task_whitelist = list(tasks or []) |
| |
| # validate that the passed groups are defined in the config |
| requested_groups = set(group_whitelist) |
| invalid_groups = requested_groups - valid_groups |
| if invalid_groups: |
| msg = 'Invalid group(s) {!r}. Must be one of {!r}'.format( |
| invalid_groups, valid_groups |
| ) |
| raise CrossbowError(msg) |
| |
| # merge the tasks defined in the selected groups |
| task_patterns = [list(config_groups[name]) for name in group_whitelist] |
| task_patterns = set(sum(task_patterns, task_whitelist)) |
| |
| # treat the task names as glob patterns to select tasks more easily |
| requested_tasks = set() |
| for pattern in task_patterns: |
| matches = fnmatch.filter(valid_tasks, pattern) |
| if len(matches): |
| requested_tasks.update(matches) |
| else: |
| raise CrossbowError( |
| "Unable to match any tasks for `{}`".format(pattern) |
| ) |
| |
| # validate that the passed and matched tasks are defined in the config |
| invalid_tasks = requested_tasks - valid_tasks |
| if invalid_tasks: |
| msg = 'Invalid task(s) {!r}. Must be one of {!r}'.format( |
| invalid_tasks, valid_tasks |
| ) |
| raise CrossbowError(msg) |
| |
| return { |
| task_name: config_tasks[task_name] for task_name in requested_tasks |
| } |
| |
| def validate(self): |
| # validate that the task groups are properly referening the tasks |
| for group_name, group in self['groups'].items(): |
| for pattern in group: |
| tasks = self.select(tasks=[pattern]) |
| if not tasks: |
| raise CrossbowError( |
| "The pattern `{}` defined for task group `{}` is not " |
| "matching any of the tasks defined in the " |
| "configuration file.".format(pattern, group_name) |
| ) |
| |
| # validate that the tasks are constructible |
| for task_name, task in self['tasks'].items(): |
| try: |
| Task(**task) |
| except Exception as e: |
| raise CrossbowError( |
| 'Unable to construct a task object from the ' |
| 'definition of task `{}`. The original error message ' |
| 'is: `{}`'.format(task_name, str(e)) |
| ) |
| |
| # validate that the defined tasks are renderable, in order to to that |
| # define the required object with dummy data |
| target = Target( |
| head='e279a7e06e61c14868ca7d71dea795420aea6539', |
| branch='master', |
| remote='https://github.com/apache/arrow', |
| version='1.0.0dev123', |
| email='dummy@example.ltd' |
| ) |
| |
| for task_name, task in self['tasks'].items(): |
| task = Task(**task) |
| files = task.render_files( |
| self.template_searchpath, |
| params=dict( |
| arrow=target, |
| queue_remote_url='https://github.com/org/crossbow' |
| ) |
| ) |
| if not files: |
| raise CrossbowError('No files have been rendered for task `{}`' |
| .format(task_name)) |
| |
| |
| # configure yaml serializer |
| yaml = YAML() |
| yaml.register_class(Job) |
| yaml.register_class(Task) |
| yaml.register_class(Target) |