blob: d3d8388d05a66fddf512782bb875ca8b7d0fb12b [file] [log] [blame]
#!/usr/bin/env python3
"""A script to set gitlab committers according to COMMITTERS.rst."""
import argparse
import json
import logging
import pathlib
import sys
import re
import urllib.request
import urllib.parse
from pprint import pformat
from typing import Any, Dict, List, Tuple
API_BASE = "https://gitlab.com/api/v4"
def main():
"""Parse CLI arguments and set up application state."""
parser = argparse.ArgumentParser(
description="Update gitlab committers according to COMMITTERS.rst"
)
parser.add_argument(
"token", type=str,
help="Your private access token. See https://gitlab.com/profile/personal_access_tokens."
)
parser.add_argument(
"committers", type=pathlib.Path,
help="The path to COMMITTERS.rst. Will try "
"to find the one in the local git repository by default.",
nargs="?",
default=find_repository_root() / "COMMITTERS.rst"
)
parser.add_argument(
"--repository", "-r", type=str,
help="The repository whose committers to set.",
default="BuildStream/buildstream"
)
parser.add_argument(
"--dry-run", "-n",
help="Do not update the actual member list.",
action="store_false"
)
parser.add_argument(
"--quiet", "-q",
help="Run quietly",
action="store_true"
)
parser.add_argument(
"--debug", "-d",
help="Show debug messages (WARNING: This *will* display the private token).",
action="store_true"
)
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
elif not args.quiet:
logging.basicConfig(level=logging.INFO)
set_committers(args.repository, args.committers.read_text(), args.token, args.dry_run)
def set_committers(repository: str, committer_file: str, token: str, commit: bool):
"""Set GitLab members as defined in the committer_file."""
# new_committers = [get_user_by_username(committer[2], token)
# for committer in parse_committer_list(committer_file)]
committers = get_allowed_to_push(repository, token)
logging.info(pformat(committers))
# logging.debug("Committer list: %s", pformat(old_committers))
# logging.debug("GitLab committers: %s", pformat(old_committers))
# new_set = set(committer["id"] for committer in new_committers)
# old_set = set(committer["id"] for committer in old_committers)
# added = [committer for committer in new_committers if committer["id"] in new_set - old_set]
# removed = [committer for committer in old_committers if committer["id"] in old_set - new_set]
# logging.info("Adding:\n%s", pformat(added))
# # logging.info("Removing:\n%s", pformat(removed))
# if commit:
# for committer in added:
# if not (committer in committers and committer["access_level"] >= 30):
# set_user_access_level(repository, committer, 30, token)
# set_allowed_to_push(repository, committer, True, token)
####################################################################################################
# Utility functions #
####################################################################################################
class RepositoryException(Exception):
"""An exception raised when we can't deal with the repository."""
def find_repository_root() -> pathlib.Path:
"""Find the root directory of a git repository, starting at cwd()."""
root = pathlib.Path.cwd()
while not any(f.name == ".git" for f in root.iterdir()):
if root == root.parent:
raise RepositoryException("'{}' is not in a git repository.".format(pathlib.Path.cwd()))
root = root.parent
return root
def parse_committer_list(committer_text: str) -> List[Tuple[str, str, str]]:
"""Parse COMMITTERS.rst and retrieve a map of names, email addresses and usernames."""
return [(committer[0].strip(), committer[1].strip(" <>"), committer[2].strip()) for committer in
re.findall(r"\|([^|]+)\|([^|]+)\|([^|]+)\|\n\+-", committer_text)]
####################################################################################################
# GitLab API #
####################################################################################################
def make_request_url(*args: Tuple[str], **kwargs: Dict[str, str]) -> str:
"""Create a request url for the GitLab API."""
return API_BASE + "/" + "/".join(args) + "?" + urllib.parse.urlencode(kwargs, safe='@')
def make_project_url(project: str, *args: Tuple[str], **kwargs: Dict[str, str]) -> str:
"""Create a request url for the given project."""
return make_request_url("projects", urllib.parse.quote(project, safe=''), *args, **kwargs)
def urlopen(url: str, token: str, data=None, method='GET') -> Any:
"""Perform a paginated query to the GitLab API."""
page = 1
res = None
result = []
while not res or page:
req = urllib.request.Request(url=url + "&" + urllib.parse.urlencode({"page": page}),
data=data, method=method)
req.add_header('PRIVATE-TOKEN', token)
logging.debug("Making API request: %s", pformat(req.__dict__))
try:
res = urllib.request.urlopen(req)
except urllib.error.HTTPError as err:
logging.error("Could not access '%s': %s", url, err)
raise
result.extend(json.load(res))
page = res.getheader('X-Next-Page')
return result
def get_members(project: str, token: str) -> List[Dict]:
"""Get a list of current committers."""
try:
return [committer for committer in
urlopen(make_project_url(project, "members", "all"), token)]
except urllib.error.HTTPError as err:
sys.exit(1)
def get_allowed_to_push(project: str, token: str):
"""Get a list of members who are allowed to push to protected branches."""
try:
return urlopen(make_project_url(project, "protected_branches", name="master"), token)
except urllib.error.HTTPError as err:
sys.exit(1)
def get_user_by_username(username: str, token: str) -> Dict:
"""Get a user ID from their email address."""
try:
return urlopen(make_request_url("users",
username=username),
token)[0]
except urllib.error.HTTPError as err:
sys.exit(1)
def set_user_access_level(project: str, user: Dict, level: int, token: str):
"""Set a user's project access level."""
try:
urlopen(make_project_url(project, "members", str(user["id"]),
access_level=str(level)),
token,
method="PUT")
except urllib.error.HTTPError as err:
logging.error("This probably means that their permissions are set in the group.")
def set_allowed_to_push(project: str, user: Dict, permission: bool, token: str):
permission = str(1) if permission else str(0)
try:
urlopen(make_project_url(project, "protected_branches",
name="master") + "&allowed_to_push[][{}]={}".format(user["id"], permission),
token,
method="POST")
except urllib.error.HTTPError:
sys.exit(1)
if __name__ == '__main__':
main()