blob: 57deee4c43d164a983d17ce410b84113b823c9c0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
import logging
import json
import time
import requests
import six.moves.urllib.parse
import six.moves.urllib.request
import six.moves.urllib.error
from datetime import datetime
from tg import config, session, redirect, request, expose, flash
from tg.decorators import without_trailing_slash
from tg import tmpl_context as c
from requests_oauthlib import OAuth2Session
from formencode import validators as fev
from allura.lib.security import is_site_admin
from forgeimporters import base
from urllib.parse import urlparse
log = logging.getLogger(__name__)
class GitHubURLValidator(fev.FancyValidator):
regex = r'https?:\/\/github\.com'
def _convert_to_python(self, value, state):
valid_url = urlparse(value.strip())
if not bool(valid_url.scheme):
raise fev.Invalid('Invalid URL', value, state)
if not re.match(self.regex, value):
raise fev.Invalid('Invalid Github URL', value, state)
return value
class GitHubProjectNameValidator(fev.FancyValidator):
not_empty = True
messages = {
'invalid': 'Valid symbols are: letters, numbers, dashes, '
'underscores and periods',
'unavailable': 'This is not a valid Github project that can be used for import',
}
def _convert_to_python(self, value, state=None):
user_name = state.full_dict.get('user_name', '')
user_name = state.full_dict.get('gh_user_name', user_name).strip()
project_name = value.strip()
full_project_name = f'{user_name}/{project_name}'
if not re.match(r'^[a-zA-Z0-9-_.]+$', project_name):
raise fev.Invalid(self.message('invalid', state), value, state)
if not GitHubProjectExtractor(full_project_name, user=c.user).check_readable():
raise fev.Invalid(self.message('unavailable', state), value, state)
return project_name
class GitHubProjectExtractor(base.ProjectExtractor):
PAGE_MAP = {
'project_info': 'https://api.github.com/repos/{project_name}',
'issues': 'https://api.github.com/repos/{project_name}/issues',
'wiki_url': 'https://github.com/{project_name}.wiki',
}
POSSIBLE_STATES = ('open', 'closed')
SUPPORTED_ISSUE_EVENTS = ('closed', 'reopened', 'assigned')
NEXT_PAGE_URL_RE = re.compile(r'<([^>]*)>; rel="next"')
def __init__(self, *args, **kw):
self.token = None
user = kw.pop('user', None)
if user:
self.token = user.get_tool_data('GitHubProjectImport', 'token')
super().__init__(*args, **kw)
def add_token(self, url):
headers = {}
if self.token:
headers['Authorization'] = f'Bearer {self.token}'
return url, headers
def wait_for_limit_reset(self, headers):
reset = headers.get('X-RateLimit-Reset')
limit = headers.get('X-RateLimit-Limit')
reset = datetime.utcfromtimestamp(int(reset))
now = datetime.utcnow()
# 60/hour is for GitHub unauthenticated users. If you get that, check your auth tokens
log.warning('Rate limit exceeded (%s requests/hour). '
'Sleeping until %s UTC' % (limit, reset))
time.sleep((reset - now).total_seconds() + 2)
def urlopen(self, url, headers=None, use_auth_headers_on_redirects=True, **kw):
"""
:param url: the URL
:param headers: dict of headers
:param use_auth_headers_on_redirects: in some cases (assets on AWS) you need to set this to False
:param kw: extra args to urlopen
"""
if headers is None:
headers = {}
url, auth_headers = self.add_token(url)
if use_auth_headers_on_redirects:
headers.update(auth_headers)
unredirected_hdrs = {}
else:
unredirected_hdrs = auth_headers
try:
return super().urlopen(url, headers=headers, unredirected_hdrs=unredirected_hdrs, **kw)
except six.moves.urllib.error.HTTPError as e:
# GitHub will return 403 if rate limit exceeded.
if e.code == 403 and e.info().get('X-RateLimit-Remaining') == '0':
self.wait_for_limit_reset(e.info())
# call ourselves to try again:
return self.urlopen(url, headers=headers, use_auth_headers_on_redirects=use_auth_headers_on_redirects,
**kw)
else:
raise
def check_readable(self):
url, headers = self.add_token(self.get_page_url('project_info'))
headers['User-Agent'] = 'Allura Data Importer (https://allura.apache.org/)'
resp = requests.head(url, headers=headers, timeout=10)
return resp.status_code == 200
def get_next_page_url(self, link):
if not link:
return
m = self.NEXT_PAGE_URL_RE.match(link)
return m.group(1) if m else None
def parse_page(self, page):
# Look at link header to handle pagination
link = page.info().get('Link')
next_page_url = self.get_next_page_url(link)
return json.loads(page.read().decode('utf8')), next_page_url
def get_page(self, page_name_or_url, **kw):
page = super().get_page(
page_name_or_url, **kw)
page, next_page_url = page
while next_page_url:
p = super().get_page(next_page_url, **kw)
p, next_page_url = p
page += p
self.page = page
return self.page
def get_summary(self):
return self.get_page('project_info').get('description')
def get_homepage(self):
return self.get_page('project_info').get('homepage')
def get_repo_url(self):
return self.get_page('project_info').get('clone_url')
def iter_issues(self):
# github api doesn't allow getting closed and opened tickets in one
# query
issues = []
url = self.get_page_url('issues') + '?state={state}'
for state in self.POSSIBLE_STATES:
issue_list_url = url.format(
state=state,
)
issues += self.get_page(issue_list_url)
issues.sort(key=lambda x: x['number'])
for issue in issues:
yield (issue['number'], issue)
def iter_comments(self, issue):
comments_url = issue['comments_url']
comments = self.get_page(comments_url)
yield from comments
def iter_events(self, issue):
events_url = issue['events_url']
events = self.get_page(events_url)
for event in events:
if event.get('event') in self.SUPPORTED_ISSUE_EVENTS:
yield event
def has_wiki(self):
return self.get_page('project_info').get('has_wiki')
def has_tracker(self):
return self.get_page('project_info').get('has_issues')
def oauth_app_basic_auth(config):
client_id = config['github_importer.client_id']
secret = config['github_importer.client_secret']
return requests.auth.HTTPBasicAuth(client_id, secret)
def valid_access_token(access_token, scopes_required=None):
tok_details = access_token_details(access_token)
if tok_details.status_code != 200:
return False
if scopes_required and not all(scope_req in tok_details.json()['scopes']
for scope_req in scopes_required):
return False
return True
def access_token_details(access_token):
# https://developer.github.com/v3/apps/oauth_applications/#check-a-token
client_id = config['github_importer.client_id']
url = f'https://api.github.com/applications/{client_id}/token'
return requests.post(url, auth=oauth_app_basic_auth(config), timeout=10, json=dict(
access_token=access_token,
))
class GitHubOAuthMixin:
'''
Support for github oauth web application flow. This is an "OAuth App" not a "GitHub App"
'''
def oauth_begin(self, scope=None): # type: (list[str]) -> None
if c.user.is_anonymous():
log.info("User needs authorization before importing a project")
return None
client_id = config.get('github_importer.client_id')
secret = config.get('github_importer.client_secret')
if not client_id or not secret:
msg = 'github_importer.* not set up in .ini file; cannot use OAuth for GitHub'
log.warning(msg)
if is_site_admin(c.user):
flash(msg, 'error')
return # GitHub app is not configured
access_token = c.user.get_tool_data('GitHubProjectImport', 'token')
if access_token and valid_access_token(access_token, scopes_required=scope):
return
redirect_uri = request.url.rstrip('/') + '/oauth_callback'
oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scope)
auth_url, state = oauth.authorization_url(
'https://github.com/login/oauth/authorize')
# Used in callback to prevent CSRF
session['github.oauth.state'] = state
session['github.oauth.redirect'] = request.url
session.save()
redirect(auth_url)
@without_trailing_slash
@expose()
def oauth_callback(self, **kw):
self.handle_oauth_callback()
def handle_oauth_callback(self):
client_id = config.get('github_importer.client_id')
secret = config.get('github_importer.client_secret')
if not client_id or not secret:
return # GitHub app is not configured
oauth = OAuth2Session(
client_id, state=session.get('github.oauth.state'))
token = oauth.fetch_token(
'https://github.com/login/oauth/access_token',
client_secret=secret,
authorization_response=request.url
)
c.user.set_tool_data('GitHubProjectImport',
token=token['access_token'])
self.oauth_callback_complete()
redirect(session.get('github.oauth.redirect', '/'))
def oauth_callback_complete(self):
"""Subclasses can implement this to perform additional actions when
token is retrieved"""
pass
def oauth_has_access(self, scope):
if not scope:
return False
token = c.user.get_tool_data('GitHubProjectImport', 'token')
if not token:
return False
r = access_token_details(token)
if r.status_code == 404:
return False
scopes = r.json()['scopes']
return scope in scopes