| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| import os |
| from datetime import datetime, timedelta |
| import re |
| from urlparse import urlparse, urljoin |
| |
| import bson |
| import tg |
| from allura.lib.exceptions import InvalidRecoveryCode |
| from tg import expose, flash, redirect, validate, config, session |
| from tg.decorators import with_trailing_slash, without_trailing_slash |
| from pylons import tmpl_context as c, app_globals as g |
| from pylons import request, response |
| from webob import exc as wexc |
| from paste.deploy.converters import asbool |
| from cryptography.hazmat.primitives.twofactor import InvalidToken |
| |
| import allura.tasks.repo_tasks |
| from allura import model as M |
| from allura.lib import validators as V |
| from allura.lib.security import require_authenticated, has_access |
| from allura.lib import helpers as h |
| from allura.lib import plugin |
| from allura.lib.decorators import require_post, reconfirm_auth |
| from allura.lib.repository import RepositoryApp |
| from allura.lib.widgets import ( |
| SubscriptionForm, |
| OAuthApplicationForm, |
| OAuthRevocationForm, |
| LoginForm, |
| ForgottenPasswordForm, |
| DisableAccountForm) |
| from allura.lib.widgets import forms, form_fields as ffw |
| from allura.lib import mail_util |
| from allura.lib.multifactor import TotpService, RecoveryCodeService |
| from allura.controllers import BaseController |
| from allura.tasks.mail_tasks import send_system_mail_to_user |
| |
| log = logging.getLogger(__name__) |
| |
| |
| class F(object): |
| login_form = LoginForm() |
| password_change_form = forms.PasswordChangeForm(action='/auth/preferences/change_password') |
| upload_key_form = forms.UploadKeyForm(action='/auth/preferences/upload_sshkey') |
| recover_password_change_form = forms.PasswordChangeBase() |
| forgotten_password_form = ForgottenPasswordForm() |
| subscription_form = SubscriptionForm() |
| registration_form = forms.RegistrationForm(action='/auth/save_new') |
| oauth_application_form = OAuthApplicationForm(action='register') |
| oauth_revocation_form = OAuthRevocationForm( |
| action='/auth/preferences/revoke_oauth') |
| change_personal_data_form = forms.PersonalDataForm() |
| add_socialnetwork_form = forms.AddSocialNetworkForm() |
| remove_socialnetwork_form = forms.RemoveSocialNetworkForm() |
| add_telnumber_form = forms.AddTelNumberForm() |
| add_website_form = forms.AddWebsiteForm() |
| skype_account_form = forms.SkypeAccountForm() |
| remove_textvalue_form = forms.RemoveTextValueForm() |
| add_timeslot_form = forms.AddTimeSlotForm() |
| remove_timeslot_form = forms.RemoveTimeSlotForm() |
| add_inactive_period_form = forms.AddInactivePeriodForm() |
| remove_inactive_period_form = forms.RemoveInactivePeriodForm() |
| save_skill_form = forms.AddUserSkillForm() |
| remove_skill_form = forms.RemoveSkillForm() |
| disable_account_form = DisableAccountForm() |
| |
| |
| class AuthController(BaseController): |
| |
| def __init__(self): |
| self.preferences = PreferencesController() |
| self.user_info = UserInfoController() |
| self.subscriptions = SubscriptionsController() |
| self.oauth = OAuthController() |
| if asbool(config.get('auth.allow_user_to_disable_account', False)): |
| self.disable = DisableAccountController() |
| |
| def __getattr__(self, name): |
| urls = plugin.UserPreferencesProvider.get().additional_urls() |
| if name not in urls: |
| raise AttributeError("'%s' object has no attribute '%s'" % (type(self).__name__, name)) |
| return urls[name] |
| |
| @expose() |
| def prefs(self, *args, **kwargs): |
| ''' |
| Redirect old /auth/prefs URL to /auth/subscriptions |
| (to handle old email links, etc). |
| ''' |
| redirect('/auth/subscriptions/') |
| |
| @expose('jinja:allura:templates/login.html') |
| @with_trailing_slash |
| def index(self, *args, **kwargs): |
| orig_request = request.environ.get('pylons.original_request', None) |
| if 'return_to' in kwargs: |
| return_to = kwargs.pop('return_to') |
| elif orig_request: |
| return_to = orig_request.url |
| else: |
| return_to = request.referer |
| c.form = F.login_form |
| return dict(return_to=return_to) |
| |
| @expose('jinja:allura:templates/login_fragment.html') |
| def login_fragment(self, *args, **kwargs): |
| return self.index(*args, **kwargs) |
| |
| @expose('jinja:allura:templates/create_account.html') |
| def create_account(self, **kw): |
| if not asbool(config.get('auth.allow_user_registration', True)): |
| raise wexc.HTTPNotFound() |
| c.form = F.registration_form |
| return dict() |
| |
| def _validate_hash(self, hash): |
| login_url = config.get('auth.login_url', '/auth/') |
| if not hash: |
| redirect(login_url) |
| user_record = M.User.query.find( |
| {'tool_data.AuthPasswordReset.hash': hash}).first() |
| if not user_record: |
| log.info('Reset hash not found: {}'.format(hash)) |
| flash('Unable to process reset, please try again') |
| redirect(login_url) |
| hash_expiry = user_record.get_tool_data( |
| 'AuthPasswordReset', 'hash_expiry') |
| if not hash_expiry or hash_expiry < datetime.utcnow(): |
| log.info('Reset hash expired: {} {}'.format(hash, hash_expiry)) |
| flash('Unable to process reset, please try again') |
| redirect(login_url) |
| return user_record |
| |
| @expose('jinja:allura:templates/forgotten_password.html') |
| def forgotten_password(self, hash=None, **kw): |
| provider = plugin.AuthenticationProvider.get(request) |
| if not provider.forgotten_password_process: |
| raise wexc.HTTPNotFound() |
| user_record = None |
| if not hash: |
| c.forgotten_password_form = F.forgotten_password_form |
| else: |
| user_record = self._validate_hash(hash) |
| c.recover_password_change_form = F.recover_password_change_form |
| return dict(hash=hash, user_record=user_record) |
| |
| @expose() |
| @require_post() |
| @validate(F.recover_password_change_form, error_handler=forgotten_password) |
| def set_new_password(self, hash=None, pw=None, pw2=None): |
| provider = plugin.AuthenticationProvider.get(request) |
| if not provider.forgotten_password_process: |
| raise wexc.HTTPNotFound() |
| user = self._validate_hash(hash) |
| user.set_password(pw) |
| user.set_tool_data('AuthPasswordReset', hash='', hash_expiry='') # Clear password reset token |
| h.auditlog_user('Password changed (through recovery process)', user=user) |
| flash('Password changed') |
| redirect('/auth/?return_to=/') # otherwise the default return_to would be the forgotten_password referrer page |
| |
| @expose() |
| @require_post() |
| def password_recovery_hash(self, email=None, **kw): |
| provider = plugin.AuthenticationProvider.get(request) |
| if not provider.forgotten_password_process: |
| raise wexc.HTTPNotFound() |
| if not email: |
| redirect('/') |
| |
| user_record = M.User.by_email_address(email) |
| allow_non_primary_email_reset = asbool(config.get('auth.allow_non_primary_email_password_reset', True)) |
| |
| if not re.match(r"[^@]+@[^@]+\.[^@]+", email): |
| flash('Enter email in correct format!','error') |
| redirect('/auth/forgotten_password') |
| |
| if not allow_non_primary_email_reset: |
| message = 'If the given email address is on record, '\ |
| 'a password reset email has been sent to the account\'s primary email address.' |
| email_record = M.EmailAddress.get(email=provider.get_primary_email_address(user_record=user_record), |
| confirmed=True) |
| else: |
| message = 'A password reset email has been sent, if the given email address is on record in our system.' |
| email_record = M.EmailAddress.get(email=email, confirmed=True) |
| |
| if user_record and email_record and email_record.confirmed: |
| hash = h.nonce(42) |
| user_record.set_tool_data('AuthPasswordReset', |
| hash=hash, |
| hash_expiry=datetime.utcnow() + |
| timedelta(seconds=int(config.get('auth.recovery_hash_expiry_period', 600)))) |
| |
| log.info('Sending password recovery link to %s', email_record.email) |
| subject = '%s Password recovery' % config['site_name'] |
| text = g.jinja2_env.get_template('allura:templates/mail/forgot_password.txt').render(dict( |
| user=user_record, |
| config=config, |
| hash=hash, |
| )) |
| send_system_mail_to_user(email_record.email, subject, text) |
| h.auditlog_user('Password recovery link sent to: %s', email, user=user_record) |
| flash(message) |
| redirect('/') |
| |
| @expose() |
| @require_post() |
| @validate(F.registration_form, error_handler=create_account) |
| def save_new(self, display_name=None, username=None, pw=None, email=None, **kw): |
| if not asbool(config.get('auth.allow_user_registration', True)): |
| raise wexc.HTTPNotFound() |
| require_email = asbool(config.get('auth.require_email_addr', False)) |
| make_project = not require_email |
| user = M.User.register( |
| dict(username=username, |
| display_name=display_name, |
| password=pw, |
| pending=require_email), make_project) |
| user.set_tool_data('allura', pwd_reset_preserve_session=session.id) |
| # else the first password set causes this session to be invalidated |
| if require_email: |
| em = user.claim_address(email) |
| if em: |
| em.send_verification_link() |
| flash('User "%s" registered. Verification link was sent to your email.' % username) |
| else: |
| plugin.AuthenticationProvider.get(request).login(user) |
| flash('User "%s" registered' % username) |
| redirect('/') |
| |
| @expose() |
| def send_verification_link(self, a): |
| addr = M.EmailAddress.get(email=a, claimed_by_user_id=c.user._id) |
| confirmed_emails = M.EmailAddress.find(dict(email=a, confirmed=True)).all() |
| confirmed_emails = filter(lambda item: item != addr, confirmed_emails) |
| |
| if addr: |
| if any(email.confirmed for email in confirmed_emails): |
| addr.send_claim_attempt() |
| else: |
| addr.send_verification_link() |
| flash('Verification link sent') |
| else: |
| flash('No such address', 'error') |
| redirect(request.referer) |
| |
| def _verify_addr(self, addr, do_auth_check=True): |
| confirmed_by_other = M.EmailAddress.find(dict(email=addr.email, confirmed=True)).all() if addr else [] |
| confirmed_by_other = filter(lambda item: item != addr, confirmed_by_other) |
| |
| if addr and not confirmed_by_other: |
| user = addr.claimed_by_user(include_pending=True) |
| if do_auth_check and not user.pending: |
| # pending is ok, since you can't be logged in to your account yet :) |
| require_authenticated() |
| if c.user != user: |
| flash('You must be logged in to the correct account', 'warning') |
| # raising HTTPUnauthorized does this same logic, but doesn't preserve the flash() message |
| # so we have to do similar logic as LoginRedirectMiddleware right here |
| login_url = tg.config.get('auth.login_url', '/auth/') |
| return_to = request.environ['PATH_INFO'] |
| if request.environ.get('QUERY_STRING'): |
| return_to += '?' + request.environ['QUERY_STRING'] |
| redirect(login_url, {'return_to': return_to}) |
| |
| addr.confirmed = True |
| flash('Email address confirmed') |
| h.auditlog_user('Email address verified: %s', addr.email, user=user) |
| if(user.get_pref('email_address') == None): |
| user.set_pref('email_address', addr.email) |
| if user.pending: |
| plugin.AuthenticationProvider.get(request).activate_user(user) |
| projectname = plugin.AuthenticationProvider.get(request).user_project_shortname(user) |
| n = M.Neighborhood.query.get(name='Users') |
| n.register_project(projectname, user=user, user_project=True) |
| else: |
| flash('Unknown verification link', 'error') |
| |
| @expose() |
| def verify_addr(self, a): |
| addr = M.EmailAddress.get(nonce=a) |
| self._verify_addr(addr) |
| redirect('/auth/preferences/') |
| |
| @expose() |
| def logout(self): |
| plugin.AuthenticationProvider.get(request).logout() |
| redirect(config.get('auth.post_logout_url', '/')) |
| |
| @staticmethod |
| def _verify_return_to(return_to): |
| # protect against any "open redirect" attacks using an external URL |
| if not return_to: |
| return_to = '/' |
| rt_host = urlparse(urljoin(config['base_url'], return_to)).netloc |
| base_host = urlparse(config['base_url']).netloc |
| if rt_host == base_host: |
| return return_to |
| else: |
| return '/' |
| |
| @expose() |
| @require_post() |
| @validate(F.login_form, error_handler=index) |
| def do_login(self, return_to=None, **kw): |
| location = '/' |
| |
| if session.get('multifactor-username'): |
| location = tg.url('/auth/multifactor', dict(return_to=return_to)) |
| elif session.get('expired-username'): |
| if return_to and return_to not in plugin.AuthenticationProvider.pwd_expired_allowed_urls: |
| location = tg.url(plugin.AuthenticationProvider.pwd_expired_allowed_urls[0], dict(return_to=return_to)) |
| else: |
| location = tg.url(plugin.AuthenticationProvider.pwd_expired_allowed_urls[0]) |
| elif return_to and return_to != request.url: |
| location = self._verify_return_to(return_to) |
| |
| redirect(location) |
| |
| @expose('jinja:allura:templates/login_multifactor.html') |
| def multifactor(self, return_to='', mode='totp', **kwargs): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| return dict( |
| return_to=return_to, |
| mode=mode, |
| ) |
| |
| @expose('jinja:allura:templates/login_multifactor.html') |
| @require_post() |
| def do_multifactor(self, code, mode, **kwargs): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| if 'multifactor-username' not in session: |
| tg.flash('Your multifactor login was disrupted, please start over.', 'error') |
| redirect('/auth/', return_to=kwargs.get('return_to', '')) |
| |
| user = M.User.by_username(session['multifactor-username']) |
| try: |
| if mode == 'totp': |
| totp_service = TotpService.get() |
| totp = totp_service.get_totp(user) |
| totp_service.verify(totp, code) |
| elif mode == 'recovery': |
| recovery = RecoveryCodeService.get() |
| recovery.verify_and_remove_code(user, code) |
| h.auditlog_user('Logged in using a multifactor recovery code', user=user) |
| except (InvalidToken, InvalidRecoveryCode): |
| c.form_errors['code'] = 'Invalid code, please try again.' |
| return self.multifactor(mode=mode, **kwargs) |
| else: |
| plugin.AuthenticationProvider.get(request).login(user=user, multifactor_success=True) |
| return_to = self._verify_return_to(kwargs.get('return_to')) |
| redirect(return_to) |
| |
| @expose(content_type='text/plain') |
| def refresh_repo(self, *repo_path): |
| # post-commit hooks use this |
| if not repo_path: |
| return 'No repo specified' |
| repo_path = '/' + '/'.join(repo_path) |
| project, rest = h.find_project(repo_path) |
| if project is None: |
| return 'No project at %s' % repo_path |
| if not rest: |
| return '%s does not include a repo mount point' % repo_path |
| h.set_context(project.shortname, |
| rest[0], neighborhood=project.neighborhood) |
| if c.app is None or not getattr(c.app, 'repo'): |
| return 'Cannot find repo at %s' % repo_path |
| allura.tasks.repo_tasks.refresh.post() |
| return '%r refresh queued.\n' % c.app.repo |
| |
| def _auth_repos(self, user): |
| def _unix_group_name(neighborhood, shortname): |
| path = neighborhood.url_prefix + \ |
| shortname[len(neighborhood.shortname_prefix):] |
| parts = [p for p in path.split('/') if p] |
| if len(parts) == 2 and parts[0] == 'p': |
| parts = parts[1:] |
| return '.'.join(reversed(parts)) |
| |
| repos = [] |
| for p in user.my_projects(): |
| for p in [p] + p.direct_subprojects: |
| for app in p.app_configs: |
| if not issubclass(g.entry_points["tool"][app.tool_name], RepositoryApp): |
| continue |
| if not has_access(app, 'write', user, p): |
| continue |
| repos.append('/%s/%s/%s' % ( |
| app.tool_name.lower(), |
| _unix_group_name(p.neighborhood, p.shortname), |
| app.options['mount_point'])) |
| repos.sort() |
| return repos |
| |
| @expose('json:') |
| def repo_permissions(self, repo_path=None, username=None, **kw): |
| """Expects repo_path to be a filesystem path like |
| <tool>/<project>.<neighborhood>/reponame[.git] |
| unless the <neighborhood> is 'p', in which case it is |
| <tool>/<project>/reponame[.git] |
| |
| Returns JSON describing this user's permissions on that repo. |
| """ |
| disallow = dict(allow_read=False, allow_write=False, |
| allow_create=False) |
| # Find the user |
| user = M.User.by_username(username) |
| if not user: |
| response.status = 404 |
| return dict(disallow, error='unknown user') |
| if not repo_path: |
| return dict(allow_write=self._auth_repos(user)) |
| |
| parts = [p for p in repo_path.split(os.path.sep) if p] |
| # strip the tool name |
| parts = parts[1:] |
| if '.' in parts[0]: |
| project, neighborhood = parts[0].split('.') |
| else: |
| project, neighborhood = parts[0], 'p' |
| parts = [neighborhood, project] + parts[1:] |
| project_path = '/' + '/'.join(parts) |
| project, rest = h.find_project(project_path) |
| if project is None: |
| log.info("Can't find project at %s from repo_path %s", |
| project_path, repo_path) |
| response.status = 404 |
| return dict(disallow, error='unknown project') |
| c.project = project |
| c.app = project.app_instance(rest[0]) |
| if not c.app: |
| c.app = project.app_instance(os.path.splitext(rest[0])[0]) |
| if c.app is None: |
| log.info("Can't find repo at %s on repo_path %s", |
| rest[0], repo_path) |
| return disallow |
| return dict(allow_read=has_access(c.app, 'read')(user=user), |
| allow_write=has_access(c.app, 'write')(user=user), |
| allow_create=has_access(c.app, 'create')(user=user)) |
| |
| @expose('jinja:allura:templates/pwd_expired.html') |
| @without_trailing_slash |
| def pwd_expired(self, **kw): |
| require_authenticated() |
| c.form = F.password_change_form |
| return {'return_to': kw.get('return_to')} |
| |
| @expose() |
| @require_post() |
| @without_trailing_slash |
| @validate(V.NullValidator(), error_handler=pwd_expired) |
| def pwd_expired_change(self, **kw): |
| require_authenticated() |
| return_to = kw.get('return_to') |
| kw = F.password_change_form.to_python(kw, None) |
| ap = plugin.AuthenticationProvider.get(request) |
| try: |
| expired_username = session.get('expired-username') |
| expired_user = M.User.query.get(username=expired_username) if expired_username else None |
| ap.set_password(expired_user or c.user, kw['oldpw'], kw['pw']) |
| expired_user.set_tool_data('allura', pwd_reset_preserve_session=session.id) |
| expired_user.set_tool_data('AuthPasswordReset', hash='', hash_expiry='') # Clear password reset token |
| |
| except wexc.HTTPUnauthorized: |
| flash('Incorrect password', 'error') |
| redirect(tg.url('/auth/pwd_expired', dict(return_to=return_to))) |
| flash('Password changed') |
| session.pop('pwd-expired', None) |
| session['username'] = session.get('expired-username') |
| session.pop('expired-username', None) |
| |
| session.save() |
| h.auditlog_user('Password reset (via expiration process)') |
| if return_to and return_to != request.url: |
| redirect(return_to) |
| else: |
| redirect('/') |
| |
| |
| def select_new_primary_addr(user, ignore_emails=[]): |
| for obj_e in user.email_addresses: |
| obj = user.address_object(obj_e) |
| if obj and obj.confirmed and obj_e not in ignore_emails: |
| return obj_e |
| |
| |
| class PreferencesController(BaseController): |
| |
| def _check_security(self): |
| require_authenticated() |
| |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/user_prefs.html') |
| def index(self, **kw): |
| c.enter_password = ffw.Lightbox(name='enter_password') |
| c.password_change_form = F.password_change_form |
| c.upload_key_form = F.upload_key_form |
| provider = plugin.AuthenticationProvider.get(request) |
| menu = provider.account_navigation() |
| return dict(menu=menu, user=c.user) |
| |
| def _update_emails(self, user, admin=False, form_params={}): |
| # not using **kw in method signature, to ensure 'admin' can't be passed in via a form submit |
| kw = form_params |
| addr = kw.pop('addr', None) |
| new_addr = kw.pop('new_addr', None) |
| primary_addr = kw.pop('primary_addr', None) |
| provider = plugin.AuthenticationProvider.get(request) |
| for i, (old_a, data) in enumerate(zip(user.email_addresses, addr or [])): |
| obj = user.address_object(old_a) |
| if data.get('delete') or not obj: |
| if not admin and (not kw.get('password') or not provider.validate_password(user, kw.get('password'))): |
| flash('You must provide your current password to delete an email', 'error') |
| return |
| if primary_addr == user.email_addresses[i]: |
| if select_new_primary_addr(user, ignore_emails=primary_addr) is None \ |
| and asbool(config.get('auth.require_email_addr', False)): |
| flash('You must have at least one verified email address.', 'error') |
| return |
| else: |
| # clear it now, a new one will get set below |
| user.set_pref('email_address', None) |
| primary_addr = None |
| user.set_tool_data('AuthPasswordReset', hash='', hash_expiry='') |
| h.auditlog_user('Email address deleted: %s', user.email_addresses[i], user=user) |
| del user.email_addresses[i] |
| if obj: |
| obj.delete() |
| if new_addr.get('claim') or new_addr.get('addr'): |
| user.set_tool_data('AuthPasswordReset', hash='', hash_expiry='') # Clear password reset token |
| claimed_emails_limit = config.get('user_prefs.maximum_claimed_emails', None) |
| if claimed_emails_limit and len(user.email_addresses) >= int(claimed_emails_limit): |
| flash('You cannot claim more than %s email addresses.' % claimed_emails_limit, 'error') |
| return |
| if not admin and (not kw.get('password') or not provider.validate_password(user, kw.get('password'))): |
| flash('You must provide your current password to claim new email', 'error') |
| return |
| |
| claimed_emails = M.EmailAddress.find({'email': new_addr['addr']}).all() |
| |
| if any(email.claimed_by_user_id == user._id for email in claimed_emails): |
| flash('Email address already claimed', 'error') |
| |
| elif mail_util.isvalid(new_addr['addr']): |
| em = M.EmailAddress.create(new_addr['addr']) |
| if em: |
| user.email_addresses.append(em.email) |
| em.claimed_by_user_id = user._id |
| |
| confirmed_emails = filter(lambda email: email.confirmed, claimed_emails) |
| if not confirmed_emails: |
| if not admin: |
| em.send_verification_link() |
| else: |
| AuthController()._verify_addr(em, do_auth_check=False) |
| else: |
| em.send_claim_attempt() |
| |
| if not admin: |
| user.set_tool_data('AuthPasswordReset', hash='', hash_expiry='') |
| flash('A verification email has been sent. Please check your email and click to confirm.') |
| |
| h.auditlog_user('New email address: %s', new_addr['addr'], user=user) |
| else: |
| flash('Email address %s is invalid' % new_addr['addr'], 'error') |
| else: |
| flash('Email address %s is invalid' % new_addr['addr'], 'error') |
| if not primary_addr and not user.get_pref('email_address') and user.email_addresses: |
| primary_addr = select_new_primary_addr(user) |
| if primary_addr: |
| if user.get_pref('email_address') != primary_addr: |
| if not admin and (not kw.get('password') or not provider.validate_password(user, kw.get('password'))): |
| flash('You must provide your current password to change primary address', 'error') |
| return |
| h.auditlog_user( |
| 'Primary email changed: %s => %s', |
| user.get_pref('email_address'), |
| primary_addr, |
| user=user) |
| user.set_pref('email_address', primary_addr) |
| user.set_tool_data('AuthPasswordReset', hash='', hash_expiry='') |
| |
| @h.vardec |
| @expose() |
| @require_post() |
| def update_emails(self, **kw): |
| if asbool(config.get('auth.allow_edit_prefs', True)): |
| self._update_emails(c.user, form_params=kw) |
| redirect('.') |
| |
| @h.vardec |
| @expose() |
| @require_post() |
| def update(self, preferences=None, **kw): |
| if asbool(config.get('auth.allow_edit_prefs', True)): |
| if not preferences.get('display_name'): |
| flash("Display Name cannot be empty.", 'error') |
| redirect('.') |
| old = c.user.get_pref('display_name') |
| c.user.set_pref('display_name', preferences['display_name']) |
| if old != preferences['display_name']: |
| h.auditlog_user('Display Name changed %s => %s', old, preferences['display_name']) |
| for k, v in preferences.iteritems(): |
| if k == 'results_per_page': |
| v = int(v) |
| c.user.set_pref(k, v) |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(V.NullValidator(), error_handler=index) |
| def change_password(self, **kw): |
| kw = F.password_change_form.to_python(kw, None) |
| ap = plugin.AuthenticationProvider.get(request) |
| try: |
| ap.set_password(c.user, kw['oldpw'], kw['pw']) |
| c.user.set_tool_data('allura', pwd_reset_preserve_session=session.id) |
| c.user.set_tool_data('AuthPasswordReset', hash='', hash_expiry='') |
| |
| except wexc.HTTPUnauthorized: |
| flash('Incorrect password', 'error') |
| redirect('.') |
| flash('Password changed') |
| h.auditlog_user('Password changed') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| def upload_sshkey(self, key=None): |
| ap = plugin.AuthenticationProvider.get(request) |
| try: |
| ap.upload_sshkey(c.user.username, key) |
| except AssertionError, ae: |
| flash('Error uploading key: %s' % ae, 'error') |
| flash('Key uploaded') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| def user_message(self, allow_user_messages=False): |
| c.user.set_pref('disable_user_messages', not allow_user_messages) |
| redirect(request.referer) |
| |
| @expose('jinja:allura:templates/user_totp.html') |
| @without_trailing_slash |
| @reconfirm_auth |
| def totp_new(self, **kw): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| totp_service = TotpService.get() |
| if 'totp_new_key' not in session: |
| # never been here yet |
| # get a new key |
| totp = totp_service.Totp(key=None) |
| # don't save to database until confirmed, just session for now |
| session['totp_new_key'] = totp.key |
| session.save() |
| else: |
| # use key from session, so we don't regenerate new keys on each page load |
| key = session['totp_new_key'] |
| totp = totp_service.Totp(key) |
| |
| qr = totp_service.get_qr_code(totp, c.user) |
| h.auditlog_user('Visited multifactor new TOTP page') |
| return dict( |
| qr=qr, |
| setup=True, |
| ) |
| |
| @expose('jinja:allura:templates/user_totp.html') |
| @without_trailing_slash |
| @reconfirm_auth |
| def totp_view(self, **kw): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| totp_service = TotpService.get() |
| totp = totp_service.get_totp(c.user) |
| qr = totp_service.get_qr_code(totp, c.user) |
| h.auditlog_user('Viewed multifactor TOTP config page') |
| return dict( |
| qr=qr, |
| setup=False, |
| ) |
| |
| @expose('jinja:allura:templates/user_totp.html') |
| @reconfirm_auth |
| @require_post() |
| @without_trailing_slash |
| def totp_set(self, code, **kw): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| key = session['totp_new_key'] |
| totp_service = TotpService.get() |
| totp = totp_service.Totp(key) |
| try: |
| totp_service.verify(totp, code) |
| except InvalidToken: |
| h.auditlog_user('Failed to set up multifactor TOTP (wrong code)') |
| c.form_errors['code'] = 'Invalid code, please try again.' |
| return self.totp_new(**kw) |
| else: |
| h.auditlog_user('Set up multifactor TOTP') |
| totp_service.set_secret_key(c.user, key) |
| c.user.set_pref('multifactor', True) |
| del session['totp_new_key'] |
| session.save() |
| tg.flash('Two factor authentication has now been set up.') |
| email_body = g.jinja2_env.get_template('allura:templates/mail/twofactor_enabled.md').render(dict( |
| user=c.user, |
| config=config, |
| )) |
| send_system_mail_to_user(c.user, u'Two-Factor Authentication Enabled', email_body) |
| redirect('/auth/preferences/multifactor_recovery') |
| |
| @expose() |
| @require_post() |
| @reconfirm_auth |
| def multifactor_disable(self, **kw): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| h.auditlog_user('Disabled multifactor TOTP') |
| totp_service = TotpService.get() |
| totp_service.set_secret_key(c.user, None) |
| recovery = RecoveryCodeService.get() |
| recovery.delete_all(c.user) |
| c.user.set_pref('multifactor', False) |
| tg.flash('Multifactor authentication has now been disabled.') |
| email_body = g.jinja2_env.get_template('allura:templates/mail/twofactor_disabled.md').render(dict( |
| user=c.user, |
| config=config, |
| )) |
| send_system_mail_to_user(c.user, u'Two-Factor Authentication Disabled', email_body) |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| def totp_send_link(self, **kw): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| email_body = g.jinja2_env.get_template('allura:templates/mail/twofactor_apps.md').render(dict( |
| user=c.user, |
| config=config, |
| )) |
| send_system_mail_to_user(c.user, u'Two-Factor Authentication Apps', email_body) |
| |
| @expose('jinja:allura:templates/user_recovery_codes.html') |
| @reconfirm_auth |
| @without_trailing_slash |
| def multifactor_recovery(self, **kw): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| if not c.user.get_pref('multifactor'): |
| redirect('.') |
| recovery = RecoveryCodeService.get() |
| codes = recovery.get_codes(c.user) |
| if not codes: |
| codes = recovery.regenerate_codes(c.user) |
| h.auditlog_user('Viewed multifactor recovery codes') |
| return dict( |
| codes=codes, |
| ) |
| |
| @expose() |
| @require_post() |
| @reconfirm_auth |
| def multifactor_recovery_regen(self, **kw): |
| if not asbool(config.get('auth.multifactor.totp', False)): |
| raise wexc.HTTPNotFound |
| |
| recovery = RecoveryCodeService.get() |
| recovery.regenerate_codes(c.user) |
| email_body = g.jinja2_env.get_template('allura:templates/mail/twofactor_recovery_regen.md').render(dict( |
| user=c.user, |
| config=config, |
| )) |
| h.auditlog_user('Regenerated multifactor recovery codes') |
| send_system_mail_to_user(c.user, u'Two-Factor Recovery Codes Regenerated', email_body) |
| tg.flash('Your recovery codes have been regenerated. Save the new codes!') |
| redirect('/auth/preferences/multifactor_recovery') |
| |
| |
| class UserInfoController(BaseController): |
| |
| def __init__(self, *args, **kwargs): |
| self.skills = UserSkillsController() |
| self.contacts = UserContactsController() |
| self.availability = UserAvailabilityController() |
| |
| def _check_security(self): |
| require_authenticated() |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/user_info.html') |
| def index(self, **kw): |
| provider = plugin.AuthenticationProvider.get(request) |
| menu = provider.account_navigation() |
| return dict(menu=menu) |
| |
| @expose() |
| @require_post() |
| @validate(F.change_personal_data_form, error_handler=index) |
| def change_personal_data(self, **kw): |
| require_authenticated() |
| c.user.set_pref('sex', kw['sex']) |
| c.user.set_pref('birthdate', kw.get('birthdate')) |
| localization = {'country': kw.get('country'), 'city': kw.get('city')} |
| c.user.set_pref('localization', localization) |
| c.user.set_pref('timezone', kw['timezone']) |
| |
| flash('Your personal data was successfully updated!') |
| redirect('.') |
| |
| |
| class UserSkillsController(BaseController): |
| |
| def __init__(self, category=None): |
| self.category = category |
| super(UserSkillsController, self).__init__() |
| |
| def _check_security(self): |
| require_authenticated() |
| |
| @expose() |
| def _lookup(self, trove_cat_id, *remainder): |
| cat = M.TroveCategory.query.get(trove_cat_id=int(trove_cat_id)) |
| if not cat: |
| raise wexc.HTTPNotFound |
| return UserSkillsController(category=cat), remainder |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/user_skills.html') |
| def index(self, **kw): |
| l = [] |
| parents = [] |
| if kw.get('selected_category') is not None: |
| selected_skill = M.TroveCategory.query.get( |
| trove_cat_id=int(kw.get('selected_category'))) |
| elif self.category: |
| selected_skill = self.category |
| else: |
| l = M.TroveCategory.query.find( |
| dict(trove_parent_id=0, show_as_skill=True)).all() |
| selected_skill = None |
| if selected_skill: |
| l = [scat for scat in selected_skill.subcategories |
| if scat.show_as_skill] |
| temp_cat = selected_skill.parent_category |
| while temp_cat: |
| parents = [temp_cat] + parents |
| temp_cat = temp_cat.parent_category |
| provider = plugin.AuthenticationProvider.get(request) |
| menu = provider.account_navigation() |
| return dict( |
| skills_list=l, |
| selected_skill=selected_skill, |
| parents=parents, |
| menu=menu, |
| add_details_fields=(len(l) == 0)) |
| |
| @expose() |
| @require_post() |
| @validate(F.save_skill_form, error_handler=index) |
| def save_skill(self, **kw): |
| trove_id = int(kw.get('selected_skill')) |
| category = M.TroveCategory.query.get(trove_cat_id=trove_id) |
| |
| new_skill = dict( |
| category_id=category._id, |
| level=kw.get('level'), |
| comment=kw.get('comment')) |
| |
| s = [skill for skill in c.user.skills |
| if str(skill.category_id) != str(new_skill['category_id'])] |
| s.append(new_skill) |
| c.user.set_pref('skills', s) |
| flash('Your skills list was successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.remove_skill_form, error_handler=index) |
| def remove_skill(self, **kw): |
| trove_id = int(kw.get('categoryid')) |
| category = M.TroveCategory.query.get(trove_cat_id=trove_id) |
| |
| s = [skill for skill in c.user.skills |
| if str(skill.category_id) != str(category._id)] |
| c.user.set_pref('skills', s) |
| flash('Your skills list was successfully updated!') |
| redirect('.') |
| |
| |
| class UserContactsController(BaseController): |
| |
| def _check_security(self): |
| require_authenticated() |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/user_contacts.html') |
| def index(self, **kw): |
| provider = plugin.AuthenticationProvider.get(request) |
| menu = provider.account_navigation() |
| return dict(menu=menu) |
| |
| @expose() |
| @require_post() |
| @validate(F.add_socialnetwork_form, error_handler=index) |
| def add_social_network(self, **kw): |
| require_authenticated() |
| c.user.add_socialnetwork(kw['socialnetwork'], kw['accounturl']) |
| flash('Your personal contacts were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.remove_socialnetwork_form, error_handler=index) |
| def remove_social_network(self, **kw): |
| require_authenticated() |
| c.user.remove_socialnetwork(kw['socialnetwork'], kw['account']) |
| flash('Your personal contacts were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.add_telnumber_form, error_handler=index) |
| def add_telnumber(self, **kw): |
| require_authenticated() |
| c.user.add_telephonenumber(kw['newnumber']) |
| flash('Your personal contacts were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.remove_textvalue_form, error_handler=index) |
| def remove_telnumber(self, **kw): |
| require_authenticated() |
| c.user.remove_telephonenumber(kw['oldvalue']) |
| flash('Your personal contacts were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.add_website_form, error_handler=index) |
| def add_webpage(self, **kw): |
| require_authenticated() |
| c.user.add_webpage(kw['newwebsite']) |
| flash('Your personal contacts were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.remove_textvalue_form, error_handler=index) |
| def remove_webpage(self, **kw): |
| require_authenticated() |
| c.user.remove_webpage(kw['oldvalue']) |
| flash('Your personal contacts were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.skype_account_form, error_handler=index) |
| def skype_account(self, **kw): |
| require_authenticated() |
| c.user.set_pref('skypeaccount', kw['skypeaccount']) |
| flash('Your personal contacts were successfully updated!') |
| redirect('.') |
| |
| |
| class UserAvailabilityController(BaseController): |
| |
| def _check_security(self): |
| require_authenticated() |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/user_availability.html') |
| def index(self, **kw): |
| provider = plugin.AuthenticationProvider.get(request) |
| menu = provider.account_navigation() |
| return dict(menu=menu) |
| |
| @expose() |
| @require_post() |
| @validate(F.add_timeslot_form, error_handler=index) |
| def add_timeslot(self, **kw): |
| require_authenticated() |
| c.user.add_timeslot(kw['weekday'], kw['starttime'], kw['endtime']) |
| flash('Your availability timeslots were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.remove_timeslot_form, error_handler=index) |
| def remove_timeslot(self, **kw): |
| require_authenticated() |
| c.user.remove_timeslot(kw['weekday'], kw['starttime'], kw['endtime']) |
| flash('Your availability timeslots were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.add_inactive_period_form, error_handler=index) |
| def add_inactive_period(self, **kw): |
| require_authenticated() |
| c.user.add_inactive_period(kw['startdate'], kw['enddate']) |
| flash('Your inactivity periods were successfully updated!') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| @validate(F.remove_inactive_period_form, error_handler=index) |
| def remove_inactive_period(self, **kw): |
| require_authenticated() |
| c.user.remove_inactive_period(kw['startdate'], kw['enddate']) |
| flash('Your availability timeslots were successfully updated!') |
| redirect('.') |
| |
| |
| class SubscriptionsController(BaseController): |
| """ Gives users the ability to manage subscriptions to tools. """ |
| |
| def _check_security(self): |
| require_authenticated() |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/user_subs.html') |
| def index(self, **kw): |
| """ The subscription selection page in user preferences. |
| |
| Builds up a list of dictionaries, each containing subscription |
| information about a tool. |
| """ |
| c.form = F.subscription_form |
| c.revoke_access = F.oauth_revocation_form |
| |
| subscriptions = [] |
| mailboxes = list(M.Mailbox.query.find( |
| dict(user_id=c.user._id, is_flash=False))) |
| projects = dict( |
| (p._id, p) for p in M.Project.query.find(dict( |
| _id={'$in': [mb.project_id for mb in mailboxes]}))) |
| app_index = dict( |
| (ac._id, ac) for ac in M.AppConfig.query.find(dict( |
| _id={'$in': [mb.app_config_id for mb in mailboxes]}))) |
| |
| # Add the tools that are already subscribed to by the user. |
| for mb in mailboxes: |
| project = projects.get(mb.project_id, None) |
| app_config = app_index.get(mb.app_config_id, None) |
| if project is None: |
| mb.query.delete() |
| continue |
| if app_config is None: |
| continue |
| app = app_config.load() |
| if not app.has_notifications: |
| continue |
| |
| subscriptions.append(dict( |
| subscription_id=mb._id, |
| project_id=project._id, |
| app_config_id=mb.app_config_id, |
| project_name=project.name, |
| tool=app_config.options['mount_label'], |
| artifact_title=dict( |
| text='Everything' if mb.artifact_title == 'All artifacts' else mb.artifact_title, |
| href=mb.artifact_url), |
| topic=mb.topic, |
| type=mb.type, |
| frequency=mb.frequency.unit, |
| artifact=mb.artifact_index_id, |
| subscribed=True)) |
| |
| # Dictionary of all projects projects accessible based on a users credentials (user_roles). |
| my_projects = dict((p._id, p) for p in c.user.my_projects()) |
| |
| # Dictionary containing all tools (subscribed and un-subscribed). |
| my_tools = M.AppConfig.query.find(dict( |
| project_id={'$in': my_projects.keys()})) |
| |
| # Dictionary containing all the currently subscribed tools for a given user. |
| my_tools_subscriptions = dict( |
| (mb.app_config_id, mb) for mb in M.Mailbox.query.find(dict( |
| user_id=c.user._id, |
| project_id={'$in': projects.keys()}, |
| app_config_id={'$in': app_index.keys()}, |
| artifact_index_id=None))) |
| |
| # Add the remaining tools that are eligible for subscription. |
| for tool in my_tools: |
| if tool['_id'] in my_tools_subscriptions: |
| continue # We have already subscribed to this tool. |
| app = tool.load() |
| if not app.has_notifications: |
| continue |
| |
| subscriptions.append( |
| dict(tool_id=tool._id, |
| user_id=c.user._id, |
| project_id=tool.project_id, |
| project_name=my_projects[tool.project_id].name, |
| tool=tool.options['mount_label'], |
| artifact_title='Everything', |
| topic=None, |
| type=None, |
| frequency=None, |
| artifact=None)) |
| |
| subscriptions.sort(key=lambda d: (d['project_name'], d['tool'])) |
| provider = plugin.AuthenticationProvider.get(request) |
| menu = provider.account_navigation() |
| return dict( |
| subscriptions=subscriptions, |
| menu=menu) |
| |
| @h.vardec |
| @expose() |
| @require_post() |
| @validate(F.subscription_form, error_handler=index) |
| def update_subscriptions(self, subscriptions=None, email_format=None, **kw): |
| for s in subscriptions: |
| if s['subscribed']: |
| if s['tool_id'] and s['project_id']: |
| M.Mailbox.subscribe( |
| project_id=bson.ObjectId(s['project_id']), |
| app_config_id=bson.ObjectId(s['tool_id'])) |
| else: |
| if s['subscription_id'] is not None: |
| s['subscription_id'].delete() |
| if email_format: |
| c.user.set_pref('email_format', email_format) |
| |
| redirect(request.referer) |
| |
| |
| class OAuthController(BaseController): |
| |
| def _check_security(self): |
| require_authenticated() |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/oauth_applications.html') |
| def index(self, **kw): |
| c.form = F.oauth_application_form |
| consumer_tokens = M.OAuthConsumerToken.for_user(c.user) |
| access_tokens = M.OAuthAccessToken.for_user(c.user) |
| provider = plugin.AuthenticationProvider.get(request) |
| return dict( |
| menu=provider.account_navigation(), |
| consumer_tokens=consumer_tokens, |
| access_tokens=access_tokens, |
| ) |
| |
| @expose() |
| @require_post() |
| @validate(F.oauth_application_form, error_handler=index) |
| def register(self, application_name=None, application_description=None, **kw): |
| M.OAuthConsumerToken(name=application_name, |
| description=application_description) |
| flash('OAuth Application registered') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| def deregister(self, _id=None): |
| app = M.OAuthConsumerToken.query.get(_id=bson.ObjectId(_id)) |
| if app is None: |
| flash('Invalid app ID', 'error') |
| redirect('.') |
| if app.user_id != c.user._id: |
| flash('Invalid app ID', 'error') |
| redirect('.') |
| M.OAuthRequestToken.query.remove({'consumer_token_id': app._id}) |
| M.OAuthAccessToken.query.remove({'consumer_token_id': app._id}) |
| app.delete() |
| flash('Application deleted') |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| def generate_access_token(self, _id): |
| """ |
| Manually generate an OAuth access token for the given consumer. |
| |
| NB: Manually generated access tokens are bearer tokens, which are |
| less secure (since they rely only on the token, which is transmitted |
| with each request, unlike the access token secret). |
| """ |
| consumer_token = M.OAuthConsumerToken.query.get(_id=bson.ObjectId(_id)) |
| if consumer_token is None: |
| flash('Invalid app ID', 'error') |
| redirect('.') |
| if consumer_token.user_id != c.user._id: |
| flash('Invalid app ID', 'error') |
| redirect('.') |
| request_token = M.OAuthRequestToken( |
| consumer_token_id=consumer_token._id, |
| user_id=c.user._id, |
| callback='manual', |
| validation_pin=h.nonce(20), |
| is_bearer=True, |
| ) |
| M.OAuthAccessToken( |
| consumer_token_id=consumer_token._id, |
| request_token_id=c.user._id, |
| user_id=request_token.user_id, |
| is_bearer=True, |
| ) |
| redirect('.') |
| |
| @expose() |
| @require_post() |
| def revoke_access_token(self, _id): |
| access_token = M.OAuthAccessToken.query.get(_id=bson.ObjectId(_id)) |
| if access_token is None: |
| flash('Invalid token ID', 'error') |
| redirect('.') |
| if access_token.user_id != c.user._id: |
| flash('Invalid token ID', 'error') |
| redirect('.') |
| access_token.delete() |
| flash('Token revoked') |
| redirect('.') |
| |
| |
| class DisableAccountController(BaseController): |
| |
| def _check_security(self): |
| require_authenticated() |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/user_disable_account.html') |
| def index(self, **kw): |
| provider = plugin.AuthenticationProvider.get(request) |
| menu = provider.account_navigation() |
| my_projects = c.user.my_projects_by_role_name('Admin') |
| return { |
| 'menu': menu, |
| 'my_projects': my_projects, |
| 'form': F.disable_account_form, |
| } |
| |
| @expose() |
| @require_post() |
| @validate(F.disable_account_form, error_handler=index) |
| def do_disable(self, password): |
| provider = plugin.AuthenticationProvider.get(request) |
| provider.disable_user(c.user) |
| flash('Your account was successfully disabled!') |
| redirect('/') |