blob: abb5c8c59e09744b64ad40e9693ea487fd74c3eb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Allura plugins for authentication and project registration
import re
import os
import logging
import subprocess
import string
import crypt
import random
from urllib2 import urlopen
from cStringIO import StringIO
from random import randint
from hashlib import sha256
from base64 import b64encode
from datetime import datetime, timedelta
import calendar
import json
import ldap
from ldap import modlist
except ImportError:
ldap = modlist = None
import pkg_resources
import tg
from tg import config, request, redirect, response
from pylons import tmpl_context as c, app_globals as g
from webob import exc
from bson.tz_util import FixedOffset
from paste.deploy.converters import asbool, asint
from ming.utils import LazyProperty
from ming.orm import state
from ming.orm import ThreadLocalORMSession, session
from allura.lib import helpers as h
from allura.lib import security
from allura.lib import exceptions as forge_exc
from allura.lib import utils
log = logging.getLogger(__name__)
class AuthenticationProvider(object):
An interface to provide authentication services for Allura.
To use a new provider, expose an entry point in
myprovider =
Then in your .ini file, set ``auth.method=myprovider``
forgotten_password_process = False
pwd_expired_allowed_urls = [
'/auth/pwd_expired', # form for changing password, must be first here
def __init__(self, request):
self.request = request
def get(cls, request):
'''returns the AuthenticationProvider instance for this request'''
result = cls._loaded_ep
except AttributeError:
method = config.get('auth.method', 'local')
result = cls._loaded_ep = g.entry_points['auth'][method]
return result(request)
def session(self):
return self.request.environ['beaker.session']
def authenticate_request(self):
from allura import model as M
username = self.session.get('username') or self.session.get('expired-username')
user = M.User.query.get(username=username)
if user is None:
return M.User.anonymous()
if user.disabled or user.pending:
return M.User.anonymous()
if not user.is_anonymous() and \
self.get_last_password_updated(user) > datetime.utcfromtimestamp(self.session.created) and \
user.get_tool_data('allura', 'pwd_reset_preserve_session') !=
log.debug('Session logged out: due to user %s pwd change %s > %s', user.username,
self.get_last_password_updated(user), datetime.utcfromtimestamp(self.session.created))
return M.User.anonymous()
if self.session.get('pwd-expired') and request.path not in self.pwd_expired_allowed_urls:
if self.request.environ['REQUEST_METHOD'] == 'GET':
return_to = self.request.environ['PATH_INFO']
if self.request.environ.get('QUERY_STRING'):
return_to += '?' + self.request.environ['QUERY_STRING']
location = tg.url(self.pwd_expired_allowed_urls[0], dict(return_to=return_to))
# Don't try to re-post; the body has been lost.
location = tg.url(self.pwd_expired_allowed_urls[0])
return user
def register_user(self, user_doc):
Register a user.
:param user_doc: a dict with 'username' and 'display_name'. Optionally 'password' and others
:rtype: :class:`User <allura.model.auth.User>`
raise NotImplementedError('register_user')
def _login(self):
Authorize a user, usually using ``self.request.params['username']`` and ``['password']``
:rtype: :class:`User <allura.model.auth.User>`
:raises: HTTPUnauthorized if user not found, or credentials are not valid
raise NotImplementedError('_login')
def login(self, user=None):
if user is None:
user = self._login()
if self.is_password_expired(user):
self.session['pwd-expired'] = True
self.session['expired-username'] = user.username
h.auditlog_user('Password expired', user=user)
self.session['username'] = user.username
if 'rememberme' in self.request.params:
remember_for = int(config.get('auth.remember_for', 365))
self.session['login_expires'] = datetime.utcnow() + timedelta(remember_for)
self.session['login_expires'] = True
g.zarkov_event('login', user=user)
# set a non-secure cookie with same expiration as session,
# so an http request can know if there is a related session on https
response.set_cookie('allura-loggedin', value='true',
expires=None if self.session['login_expires'] is True else self.session['login_expires'],
secure=False, httponly=True)
return user
except exc.HTTPUnauthorized:
def logout(self):
def validate_password(self, user, password):
'''Check that provided password matches actual user password
:rtype: bool
raise NotImplementedError('validate_password')
def disable_user(self, user, **kw):
'''Disable user account'''
raise NotImplementedError('disable_user')
def enable_user(self, user, **kw):
'''Enable user account'''
raise NotImplementedError('enable_user')
def activate_user(self, user, **kw):
'''Activate user after registration'''
raise NotImplementedError('activate_user')
def deactivate_user(self, user, **kw):
'''Deactivate user (== registation not confirmed)'''
raise NotImplementedError('deactivate_user')
def by_username(self, username):
Find a user by username.
:rtype: :class:`User <allura.model.auth.User>` or None
raise NotImplementedError('by_username')
def set_password(self, user, old_password, new_password):
Set a user's password.
A provider implementing this method should store the timestamp of this change, either
on ``user.last_password_updated`` or somewhere else that a custom ``get_last_password_updated`` method uses.
:param user: a :class:`User <allura.model.auth.User>`
:rtype: None
:raises: HTTPUnauthorized if old_password is not valid
raise NotImplementedError('set_password')
def upload_sshkey(self, username, pubkey):
Upload an SSH Key. Providers do not necessarily need to implement this.
:rtype: None
:raises: AssertionError with user message, upon any error
raise NotImplementedError('upload_sshkey')
def account_navigation(self):
return [
'tabid': 'account_user_prefs',
'title': 'Preferences',
'target': "/auth/preferences",
'alt': 'Manage Personal Preferences',
'tabid': 'account_user_info',
'title': 'Personal Info',
'target': "/auth/user_info",
'alt': 'Manage Personal Information',
'tabid': 'account_subscriptions',
'title': 'Subscriptions',
'target': "/auth/subscriptions",
'alt': 'Manage Subscription Preferences',
'tabid': 'account_oauth',
'title': 'OAuth',
'target': "/auth/oauth",
'alt': 'Manage OAuth Preferences',
def account_urls(self):
return {m['tabid']: m['target'] for m in self.account_navigation()}
def user_project_shortname(self, user):
:param user: a :class:`User <allura.model.auth.User>`
:rtype: str
raise NotImplementedError('user_project_shortname')
def user_by_project_shortname(self, shortname):
:param str: shortname
:rtype: user: a :class:`User <allura.model.auth.User>`
raise NotImplementedError('user_by_project_shortname')
def update_notifications(self, user):
raise NotImplementedError('update_notifications')
def user_registration_date(self, user):
Returns the date in which a user registered himself/herself on the forge.
:param user: a :class:`User <allura.model.auth.User>`
:rtype: :class:`datetime <datetime.datetime>`
raise NotImplementedError('user_registration_date')
def get_last_password_updated(self, user):
Returns the date when the user updated password for a last time.
:param user: a :class:`User <allura.model.auth.User>`
:rtype: :class:`datetime <datetime.datetime>`
raise NotImplementedError('get_last_password_updated')
def get_primary_email_address(self, user_record):
return user_record.get_pref('email_address') if user_record else None
def user_details(self, user):
'''Returns detailed information about user.
:param user: a :class:`User <allura.model.auth.User>`
return {}
def is_password_expired(self, user):
days = asint(config.get('auth.pwdexpire.days', 0))
before = asint(config.get('auth.pwdexpire.before', 0))
now = datetime.utcnow()
last_updated = self.get_last_password_updated(user)
if days and now - last_updated > timedelta(days=days):
return True
if before and last_updated < datetime.utcfromtimestamp(before):
return True
return False
def index_user(self, user):
"""Put here additional fields for user index in SOLR."""
return {}
def details_links(self, user):
'''Return list of pairs (url, label) with details
about the user.
Links will show up at admin user search page.
return [
('/nf/admin/user/%s' % user.username, 'Details/Edit'),
class LocalAuthenticationProvider(AuthenticationProvider):
Stores user passwords on the User model, in mongo. Uses per-user salt and
SHA-256 encryption.
forgotten_password_process = True
def register_user(self, user_doc):
from allura import model as M
u = M.User(**user_doc)
if 'password' in user_doc:
return u
def _login(self):
user = self.by_username(self.request.params['username'])
if not self._validate_password(user, self.request.params['password']):
raise exc.HTTPUnauthorized()
return user
def disable_user(self, user, **kw):
user.disabled = True
if kw.get('audit', True):
h.auditlog_user(u'Account disabled', user=user)
def enable_user(self, user, **kw):
user.disabled = False
if kw.get('audit', True):
h.auditlog_user(u'Account enabled', user=user)
def activate_user(self, user, **kw):
user.pending = False
if kw.get('audit', True):
h.auditlog_user('Account activated', user=user)
def deactivate_user(self, user, **kw):
user.pending = True
if kw.get('audit', True):
h.auditlog_user('Account changed to pending', user=user)
def validate_password(self, user, password):
return self._validate_password(user, password)
def _validate_password(self, user, password):
if user is None:
return False
if not user.password:
return False
salt = str(user.password[6:6 + user.SALT_LEN])
check = self._encode_password(password, salt)
if check != user.password:
return False
return True
def by_username(self, username):
from allura import model as M
un = re.escape(username)
un = un.replace(r'\_', '[-_]')
un = un.replace(r'\-', '[-_]')
rex = re.compile('^' + un + '$')
return M.User.query.get(username=rex, disabled=False, pending=False)
def set_password(self, user, old_password, new_password):
if old_password is not None and not self.validate_password(user, old_password):
raise exc.HTTPUnauthorized()
user.password = self._encode_password(new_password)
user.last_password_updated = datetime.utcnow()
def _encode_password(self, password, salt=None):
from allura import model as M
if salt is None:
salt = ''.join(chr(randint(1, 0x7f))
for i in xrange(M.User.SALT_LEN))
hashpass = sha256(salt + password.encode('utf-8')).digest()
return 'sha256' + salt + b64encode(hashpass)
def user_project_shortname(self, user):
return 'u/' + user.username.replace('_', '-')
def user_by_project_shortname(self, shortname):
from allura import model as M
return M.User.query.get(username=shortname, disabled=False, pending=False)
def update_notifications(self, user):
return ''
def user_registration_date(self, user):
if user._id:
return user._id.generation_time
return datetime.utcnow()
def get_last_password_updated(self, user):
d = user.last_password_updated
if d is None:
d = self.user_registration_date(user)
# _id.generation_time returns aware datetime (in UTC)
# but we're using naive UTC time everywhere
d = datetime.utcfromtimestamp(calendar.timegm(d.utctimetuple()))
return d
def index_user(self, user):
fields = super(LocalAuthenticationProvider, self).index_user(user)
return dict(user_registration_date_dt=self.user_registration_date(user), **fields)
def ldap_conn(who=None, cred=None):
Init & bind a connection with the given creds, or the admin creds if not
specified. Remember to unbind the connection when done.
con = ldap.initialize(config['auth.ldap.server'])
con.bind_s(who or config['auth.ldap.admin_dn'],
cred or config['auth.ldap.admin_password'])
return con
def ldap_user_dn(username):
'return a Distinguished Name for a given username'
if not username:
raise ValueError('Empty username')
return 'uid=%s,%s' % (
class LdapAuthenticationProvider(AuthenticationProvider):
forgotten_password_process = True
def register_user(self, user_doc):
from allura import model as M
result = M.User(**user_doc)
if asbool(config.get('auth.ldap.autoregister', True)):
if asbool(config.get('auth.allow_user_registration', True)):
raise Exception('You should not have both "auth.ldap.autoregister" and '
'"auth.allow_user_registration" set to true')
log.debug('LdapAuth: autoregister is true, so only creating the mongo '
'record (not creating ldap record)')
return result
# full registration into LDAP
uid = str(M.AuthGlobals.get_next_uid())
con = ldap_conn()
uname = user_doc['username'].encode('utf-8')
display_name = user_doc['display_name'].encode('utf-8')
ldif_u = modlist.addModlist(dict(
objectClass=['account', 'posixAccount'],
homeDirectory='/home/' + uname,
description='SCM user account'))
con.add_s(ldap_user_dn(user_doc['username']), ldif_u)
except ldap.ALREADY_EXISTS:
log.exception('Trying to create existing user %s', uname)
if asbool(config.get('auth.ldap.use_schroot', True)):
argv = ('schroot -d / -c %s -u root / init %s' % (
config['auth.ldap.schroot_name'], user_doc['username'])).split()
p = subprocess.Popen(
argv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
rc = p.wait()
if rc != 0:
log.error('Error creating home directory for %s',
return result
def upload_sshkey(self, username, pubkey):
if not asbool(config.get('auth.ldap.use_schroot', True)):
raise NotImplementedError('SSH keys are not supported')
argv = ('schroot -d / -c %s -u root / upload %s' % (
config['auth.ldap.schroot_name'], username)).split() + [pubkey]
p = subprocess.Popen(
argv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
rc = p.wait()
if rc != 0:
errmsg =
log.exception('Error uploading public SSH key for %s: %s',
username, errmsg)
assert False, errmsg
def _get_salt(self, length):
def random_char():
return random.choice(string.ascii_uppercase + string.digits)
return ''.join(random_char() for i in range(length))
def _encode_password(self, password, salt=None):
cfg_prefix = 'auth.ldap.password.'
salt_len = asint(config.get(cfg_prefix + 'salt_len', 16))
algorithm = config.get(cfg_prefix + 'algorithm', 6)
rounds = asint(config.get(cfg_prefix + 'rounds', 6000))
salt = self._get_salt(salt_len) if salt is None else salt
encrypted = crypt.crypt(
'$%s$rounds=%s$%s' % (algorithm, rounds, salt))
return '{CRYPT}%s' % encrypted
def by_username(self, username):
from allura import model as M
return M.User.query.get(username=username, disabled=False, pending=False)
def set_password(self, user, old_password, new_password):
dn = ldap_user_dn(user.username)
if old_password:
ldap_ident = dn
ldap_pass = old_password.encode('utf-8')
ldap_ident = ldap_pass = None
con = ldap_conn(ldap_ident, ldap_pass)
new_password = self._encode_password(new_password)
dn, [(ldap.MOD_REPLACE, 'userPassword', new_password)])
user.last_password_updated = datetime.utcnow()
raise exc.HTTPUnauthorized()
def _login(self):
if ldap is None:
raise Exception('The python-ldap package needs to be installed. '
'Run `pip install python-ldap` in your allura environment.')
from allura import model as M
username = str(self.request.params['username'])
except UnicodeEncodeError:
raise exc.HTTPBadRequest('Unicode is not allowed in usernames')
if not self._validate_password(username, self.request.params['password']):
raise exc.HTTPUnauthorized()
user = M.User.query.get(username=username)
if user is None:
if asbool(config.get('auth.ldap.autoregister', True)):
log.debug('LdapAuth: authorized user {} needs a mongo record registered. '
user = M.User.register({'username': username,
'display_name': LdapUserPreferencesProvider()._get_pref(username, 'display_name'),
log.debug('LdapAuth: no user {} found in local mongo'.format(username))
raise exc.HTTPUnauthorized()
elif user.disabled or user.pending:
log.debug('LdapAuth: user {} is disabled or pending in Allura'.format(username))
raise exc.HTTPUnauthorized()
return user
def validate_password(self, user, password):
'''by user'''
return self._validate_password(user.username, password)
def _validate_password(self, username, password):
'''by username'''
password = h.really_unicode(password).encode('utf-8')
ldap_user = ldap_user_dn(username)
except ValueError:
return False
con = ldap_conn(ldap_user, password)
return True
log.debug('LdapAuth: could not authenticate {}'.format(username), exc_info=True)
return False
def user_project_shortname(self, user):
return LocalAuthenticationProvider(None).user_project_shortname(user)
def user_by_project_shortname(self, shortname):
return LocalAuthenticationProvider(None).user_by_project_shortname(shortname)
def user_registration_date(self, user):
# could read this from an LDAP field?
return LocalAuthenticationProvider(None).user_registration_date(user)
def update_notifications(self, user):
return LocalAuthenticationProvider(None).update_notifications(user)
def disable_user(self, user, **kw):
return LocalAuthenticationProvider(None).disable_user(user, **kw)
def enable_user(self, user, **kw):
return LocalAuthenticationProvider(None).enable_user(user, **kw)
def activate_user(self, user, **kw):
return LocalAuthenticationProvider(None).activate_user(user, **kw)
def deactivate_user(self, user, **kw):
return LocalAuthenticationProvider(None).deactivate_user(user, **kw)
def get_last_password_updated(self, user):
return LocalAuthenticationProvider(None).get_last_password_updated(user)
class ProjectRegistrationProvider(object):
Project registration services for Allura. This is a full implementation
and the default. Extend this class with your own if you need to add more
To use a new provider, expose an entry point in
myprovider =
Then in your .ini file, set registration.method=myprovider
The provider should expose an attribute, `shortname_validator` which is
an instance of a FormEncode validator that validates project shortnames.
The `to_python()` method of the validator should accept a `check_allowed`
argument to indicate whether additional checks beyond correctness of the
name should be done, such as whether the name is already in use.
def __init__(self):
from allura.lib.widgets import forms
self.add_project_widget = forms.NeighborhoodAddProjectForm
self.shortname_validator = forms.NeighborhoodProjectShortNameValidator(
def get(cls):
from allura.lib import app_globals
method = config.get('registration.method', 'local')
return app_globals.Globals().entry_points['registration'][method]()
def suggest_name(self, project_name, neighborhood):
"""Return a suggested project shortname for the full ``project_name``.
Example: "My Great Project" -> "mygreatproject"
return re.sub("[^A-Za-z0-9]", "", project_name).lower()
def rate_limit(self, user, neighborhood):
"""Check the various config-defined project registration rate
limits, and if any are exceeded, raise ProjectRatelimitError.
if security.has_access(neighborhood, 'admin', user=user)():
# have to have the replace because, despite being UTC,
# the result from utcnow() is still offset-naive :-(
# maybe look into making the mongo connection offset-naive?
now = datetime.utcnow().replace(tzinfo=FixedOffset(0, 'UTC'))
project_count = len(list(user.my_projects()))
rate_limits = json.loads(config.get('project.rate_limits', '{}'))
for rate, count in rate_limits.items():
user_age = now - user._id.generation_time
user_age = (user_age.microseconds +
(user_age.seconds + user_age.days * 24 * 3600) * 10 ** 6) / 10 ** 6
if user_age < int(rate) and project_count >= count:
raise forge_exc.ProjectRatelimitError()
def phone_verified(self, user, neighborhood):
Check if user has completed phone verification.
Returns True if one of the following is true:
- phone verification is disabled
- :param user: has 'admin' access to :param neighborhood:
- :param user: is has 'admin' access for some project, which belongs
to :param neighborhood:
- phone is already verified for a :param user:
Otherwise returns False.
if not asbool(config.get('project.verify_phone')):
return True
if security.has_access(neighborhood, 'admin', user=user)():
return True
admin_in = [p for p in user.my_projects_by_role_name('Admin')
if p.neighborhood_id == neighborhood._id]
if len(admin_in) > 0:
return True
return bool(user.get_tool_data('phone_verification', 'number_hash'))
def verify_phone(self, user, number):
ok = {'status': 'ok'}
if not asbool(config.get('project.verify_phone')):
return ok
number = utils.clean_phone_number(number)
return g.phone_service.verify(number)
def check_phone_verification(self, user, request_id, pin, number_hash):
ok = {'status': 'ok'}
if not asbool(config.get('project.verify_phone')):
return ok
res = g.phone_service.check(request_id, pin)
if res.get('status') == 'ok':
user.set_tool_data('phone_verification', number_hash=number_hash)
msg = 'Phone verification succeeded. Hash: {}'.format(number_hash)
h.auditlog_user(msg, user=user)
msg = 'Phone verification failed. Hash: {}'.format(number_hash)
h.auditlog_user(msg, user=user)
return res
def register_neighborhood_project(self, neighborhood, users, allow_register=False):
from allura import model as M
shortname = '--init--'
name = 'Home Project for %s' %
p = M.Project(neighborhood_id=neighborhood._id,
'You can edit this description in the admin page'),
homepage_title = '# ' + name,
last_updated = datetime.utcnow(),
('Wiki', 'wiki', 'Wiki'),
('admin', 'admin', 'Admin')])
log.exception('Error registering project %s' % p)
if allow_register:
role_auth = M.ProjectRole.authenticated(p)
security.simple_grant(p.acl, role_auth._id, 'register')
return p
def register_project(self, neighborhood, shortname, project_name, user, user_project, private_project, apps=None):
'''Register a new project in the neighborhood. The given user will
become the project's superuser.
self.validate_project(neighborhood, shortname,
project_name, user, user_project, private_project)
return self._create_project(neighborhood, shortname, project_name, user, user_project, private_project, apps)
def validate_project(self, neighborhood, shortname, project_name, user, user_project, private_project):
Validate that a project can be registered, before it is
from allura import model as M
# Check for private project rights
if neighborhood.features['private_projects'] is False and private_project:
raise ValueError(
"You can't create private projects for %s neighborhood" %
# Check for project limit creation
nb_max_projects = neighborhood.get_max_projects()
if nb_max_projects is not None:
count = M.Project.query.find(dict(
if count >= nb_max_projects:
log.exception('Error registering project %s' % project_name)
raise forge_exc.ProjectOverlimitError()
self.rate_limit(user, neighborhood)
if not self.phone_verified(user, neighborhood):
raise forge_exc.ProjectPhoneVerificationError()
if user_project and shortname.startswith('u/'):
check_shortname = shortname.replace('u/', '', 1)
check_shortname = shortname
check_shortname, neighborhood=neighborhood)
p = M.Project.query.get(
shortname=shortname, neighborhood_id=neighborhood._id)
if p:
raise forge_exc.ProjectConflict(
'%s already exists in nbhd %s' % (shortname, neighborhood._id))
def index_project(self, project):
Put here additional fields given project should be indexed by SOLR.
return dict()
def _create_project(self, neighborhood, shortname, project_name, user, user_project, private_project, apps):
Actually create the project, no validation. This should not be called directly
under normal circumstances.
from allura import model as M
project_template = neighborhood.get_project_template()
p = M.Project(neighborhood_id=neighborhood._id,
'You can edit this description in the admin page'),
last_updated = datetime.utcnow(),
is_private_project=private_project or project_template.get(
'private', False),
apps=apps or [] if 'tools' in project_template else None)
# Setup defaults from neighborhood project template if applicable
offset = p.next_mount_point(include_hidden=True)
if 'groups' in project_template:
for obj in project_template['groups']:
name = obj.get('name')
permissions = set(obj.get('permissions', [])) & \
usernames = obj.get('usernames', [])
# Must provide a group name
if not name:
# If the group already exists, we'll add users to it,
# but we won't change permissions on the group
group = M.ProjectRole.by_name(name, project=p)
if not group:
# If creating a new group, *must* specify permissions
if not permissions:
group = M.ProjectRole(project_id=p._id, name=name)
p.acl += [M.ACE.allow(group._id, perm)
for perm in permissions]
for username in usernames:
guser = M.User.by_username(username)
if not (guser and guser._id):
pr = M.ProjectRole.by_user(guser, project=p, upsert=True)
if group._id not in pr.roles:
if 'tools' in project_template:
for i, tool in enumerate(project_template['tools'].keys()):
tool_config = project_template['tools'][tool]
tool_options = tool_config.get('options', {})
for k, v in tool_options.iteritems():
if isinstance(v, basestring):
tool_options[k] = \
p.__dict__.get('root_project', {}))
if p.app_instance(tool) is None:
app = p.install_app(tool,
ordinal=i + offset,
if tool == 'wiki':
from forgewiki import model as WM
text = tool_config.get('home_text',
'[[members limit=20]]\n[[download_button]]')
app_config_id=app.config._id).text = text
if 'tool_order' in project_template:
for i, tool in enumerate(project_template['tool_order']):
p.app_config(tool).options.ordinal = i
if 'labels' in project_template:
p.labels = project_template['labels']
if 'trove_cats' in project_template:
for trove_type in project_template['trove_cats'].keys():
troves = getattr(p, 'trove_%s' % trove_type)
for trove_id in project_template['trove_cats'][trove_type]:
if 'icon' in project_template:
icon_file = StringIO(
project_template['icon']['filename'], icon_file,
square=True, thumbnail_size=(48, 48),
thumbnail_meta=dict(project_id=p._id, category='icon'))
if user_project:
# Allow for special user-only tools
p._extra_tool_status = ['user']
# add user project informative text to home
from forgewiki import model as WM
home_app = p.app_instance('wiki')
home_page = WM.Page.query.get(app_config_id=home_app.config._id)
home_page.text = ("This is the personal project of %s."
" This project is created automatically during user registration"
" as an easy place to store personal data that doesn't need its own"
" project such as cloned repositories.") % user.display_name
# clear the RoleCache for the user so this project will
# be picked up by user.my_projects()
g.credentials.clear_user(user._id, None) # unnamed roles for this user
# named roles for this project + user
g.credentials.clear_user(user._id, p._id)
with h.push_config(c, project=p, user=user):
# have to add user to context, since this may occur inside auth code
# for user-project reg, and c.user isn't set yet
return p
def register_subproject(self, project, name, user, install_apps, project_name=None):
from allura import model as M
assert h.re_project_name.match(name), 'Invalid subproject shortname'
shortname = project.shortname + '/' + name
ordinal = int(project.ordered_mounts(include_hidden=True)
[-1]['ordinal']) + 1
sp = M.Project(
name=project_name or name,
with h.push_config(c, project=sp):
if install_apps:
sp.install_app('admin', 'admin', ordinal=1)
sp.install_app('search', 'search', ordinal=2)
return sp
def delete_project(self, project, user):
for sp in project.subprojects:
self.delete_project(sp, user)
project.deleted = True
def undelete_project(self, project, user):
project.deleted = False
for sp in project.subprojects:
self.undelete_project(sp, user)
def best_download_url(self, project):
'''This is the url needed to render a download button.
It should be overridden for your specific envirnoment'''
return None
def registration_date(self, project):
Return the datetime the project was created.
return project._id.generation_time
def details_links(self, project):
'''Return list of pairs (url, label) with details
about the project.
Links will show up at admin project search page
return [
(project.url() + 'admin/groups/', 'Members'),
(project.url() + 'admin/audit/', 'Audit Trail'),
class ThemeProvider(object):
Theme information for Allura. This is a full implementation
and the default. Extend this class with your own if you need to add more
To use a new provider, expose an entry point in
myprovider =
Then in your .ini file, set theme=mytheme
The variables referencing jinja template files can be changed to point at your
own jinja templates. Use the standard templates as a reference, you should
provide matching macros and block names.
For more information, see
:var icons: a dictionary of sized icons for each tool
master_template = 'allura:templates/jinja_master/master.html'
jinja_macros = 'allura:templates/jinja_master/theme_macros.html'
nav_menu = 'allura:templates/jinja_master/nav_menu.html'
top_nav = 'allura:templates/jinja_master/top_nav.html'
sidebar_menu = 'allura:templates/jinja_master/sidebar_menu.html'
icons = {
'subproject': {
24: 'images/ext_24.png',
32: 'images/ext_32.png',
48: 'images/ext_48.png'
def require(self):
g.register_theme_css('css/site_style.css', compress=False)
g.register_theme_css('css/allura.css', compress=False)
def register_ew_resources(cls, manager, name):
'theme/%s' % name,
os.path.join('nf', name)))
def href(self, href, theme_name=None):
Build a full URL for a given resource path
:param href: a path like ``css/site_style.css``
:param theme_name: defaults to current theme
:return: a full URL
if theme_name is None:
theme_name = config.get('theme', 'allura')
return g.resource_manager.absurl('theme/%s/%s' % (theme_name, href))
def personal_data_form(self):
:return: None, or an easywidgets Form to render on the user preferences page
from allura.lib.widgets.forms import PersonalDataForm
return PersonalDataForm()
def add_telnumber_form(self):
:return: None, or an easywidgets Form to render on the user preferences page to
allow adding a telephone number.
from allura.lib.widgets.forms import AddTelNumberForm
return AddTelNumberForm()
def add_website_form(self):
:return: None, or an easywidgets Form to render on the user preferences page to
allow adding a personal website url.
from allura.lib.widgets.forms import AddWebsiteForm
return AddWebsiteForm()
def skype_account_form(self):
:return: None, or an easywidgets Form to render on the user preferences page to
allow setting the user's Skype account.
from allura.lib.widgets.forms import SkypeAccountForm
return SkypeAccountForm()
def remove_textvalue_form(self):
:return: None, or an easywidgets Form to render on the user preferences page to
allow removing a single text value from a list.
from allura.lib.widgets.forms import RemoveTextValueForm
return RemoveTextValueForm()
def add_socialnetwork_form(self):
:return: None, or an easywidgets Form to render on the user preferences page to
allow adding a social network account.
from allura.lib.widgets.forms import AddSocialNetworkForm
return AddSocialNetworkForm(action='/auth/preferences/add_social_network')
def remove_socialnetwork_form(self):
:return: None, or an easywidgets Form to render on the user preferences page to
allow removing a social network account.
from allura.lib.widgets.forms import RemoveSocialNetworkForm
return RemoveSocialNetworkForm(action='/auth/preferences/remove_social_network')
def add_timeslot_form(self):
:return: None, or an easywidgets Form to render on the user preferences page
to allow creating a new availability timeslot
from allura.lib.widgets.forms import AddTimeSlotForm
return AddTimeSlotForm()
def remove_timeslot_form(self):
:return: None, or an easywidgets Form to render on the user preferences page
to remove a timeslot
from allura.lib.widgets.forms import RemoveTimeSlotForm
return RemoveTimeSlotForm()
def add_inactive_period_form(self):
:return: None, or an easywidgets Form to render on the user preferences page
to allow creating a new period of inactivity
from allura.lib.widgets.forms import AddInactivePeriodForm
return AddInactivePeriodForm()
def remove_inactive_period_form(self):
:return: None, or an easywidgets Form to render on the user preferences page
to allow removing an existing period of inactivity
from allura.lib.widgets.forms import RemoveInactivePeriodForm
return RemoveInactivePeriodForm()
def add_trove_category(self):
:return: None, or an easywidgets Form to render on the page to create a
new trove_category
from allura.lib.widgets.forms import AddTroveCategoryForm
return AddTroveCategoryForm(action='/categories/create')
def remove_trove_category(self):
:return: None, or an easywidgets Form to render on the page to remove
an existing trove_category
from allura.lib.widgets.forms import RemoveTroveCategoryForm
return RemoveTroveCategoryForm(action='/categories/remove')
def add_user_skill(self):
:return: None, or an easywidgets Form to render on the page to add a
new skill to a user profile
from allura.lib.widgets.forms import AddUserSkillForm
return AddUserSkillForm(action='/auth/user_info/skills/save_skill')
def select_subcategory_form(self):
:return: None, or an easywidgets Form to render on the page to add a
new skill to a user profile, allowing to select a category in
order to see its sub-categories
from allura.lib.widgets.forms import SelectSubCategoryForm
return SelectSubCategoryForm(action='/auth/user_info/skills/')
def remove_user_skill(self):
:return: None, or an easywidgets Form to render on the page to remove
an existing skill from a user profile
from allura.lib.widgets.forms import RemoveSkillForm
return RemoveSkillForm(action='/auth/user_info/skills/remove_skill')
def master(self):
return self.master_template
def get(cls):
name = config.get('theme', 'allura')
return g.entry_points['theme'][name]()
def app_icon_url(self, app, size):
"""returns the default icon for the given app (or non-app thing like 'subproject').
Takes an instance of class Application, or else a string.
Expected to be overriden by derived Themes.
if isinstance(app, unicode):
app = str(app)
if isinstance(app, str):
if app in self.icons and size in self.icons[app]:
return g.theme_href(self.icons[app][size])
elif app in g.entry_points['tool']:
return g.entry_points['tool'][app].icon_url(size)
return None
return app.icon_url(size)
def get_site_notification(self):
from pylons import request, response
from allura.model.notification import SiteNotification
note = SiteNotification.current()
if note is None:
return None
cookie = request.cookies.get('site-notification', '').split('-')
if len(cookie) == 3 and cookie[0] == str(note._id):
views = asint(cookie[1]) + 1
closed = asbool(cookie[2])
views = 1
closed = False
if closed or note.impressions > 0 and views > note.impressions:
return None
'-'.join(map(str, [note._id, views, closed])),
return note
class LocalProjectRegistrationProvider(ProjectRegistrationProvider):
class UserPreferencesProvider(object):
An interface for user preferences, like display_name and email_address
To use a new provider, expose an entry point in
myprefs =
Then in your .ini file, set user_prefs_storage.method=myprefs
def get(cls):
method = config.get('user_prefs_storage.method', 'local')
return g.entry_points['user_prefs'][method]()
def get_pref(self, user, pref_name):
:param user: a :class:`User <allura.model.auth.User>`
:param str pref_name:
:return: pref_value
:raises: AttributeError if pref_name not found
raise NotImplementedError('get_pref')
def set_pref(self, user, pref_name, pref_value):
:param user: a :class:`User <allura.model.auth.User>`
:param str pref_name:
:param pref_value:
raise NotImplementedError('set_pref')
def additional_urls(self):
Returns a mapping of additional routes for AuthProvider.
By default, scans the provider for @expose()ed methods, which are
added as pages with the same name as the method. Note that if you
want the new pages to show up in the menu on the various auth pages,
you will also need to add it to the list returned by
If you want to override this behavior, you can override this method
and manually return a mapping of `{page_name: handler, ...}`. Note,
however, that this could break future subclasses of your providers'
ability to extend the list.
For example: `{'newroute', newroute_handler}` will add 'newroute'
attribute to the auth controller, which will be set to `newroute_handler`.
`newroute_handler` can either be an @expose()ed method, or a controller
that can dispatch further sub-pages.
`newroute_handler` must be decorated with @expose(), but does not have
to live on the provider.
urls = {}
for attr_name in dir(self):
attr_value = getattr(self, attr_name)
decoration = getattr(attr_value, 'decoration', None)
if getattr(decoration, 'exposed', False):
urls[attr_name] = attr_value
return urls
class LocalUserPreferencesProvider(UserPreferencesProvider):
The default UserPreferencesProvider, storing preferences on the User object
in mongo.
def get_pref(self, user, pref_name):
if pref_name in user.preferences:
return user.preferences[pref_name]
elif pref_name == 'display_name':
# get the value directly from ming's internals, bypassing
# FieldPropertyDisplayName which always calls back to this get_pref
# method (infinite recursion)
return user.__dict__['__ming__'].state.document.display_name
return getattr(user, pref_name)
def set_pref(self, user, pref_name, pref_value):
if pref_name in user.preferences:
user.preferences[pref_name] = pref_value
setattr(user, pref_name, pref_value)
class LdapUserPreferencesProvider(UserPreferencesProvider):
Store preferences in LDAP, falling back to LocalUserPreferencesProvider
def fields(self):
return h.config_with_prefix(config, 'user_prefs_storage.ldap.fields.')
def get_pref(self, user, pref_name):
from allura import model as M
if pref_name in self.fields and user != M.User.anonymous():
self._get_pref(user.username, pref_name)
return LocalUserPreferencesProvider().get_pref(user, pref_name)
def _get_pref(self, username, pref_name):
con = ldap_conn()
rs = con.search_s(ldap_user_dn(username), ldap.SCOPE_BASE)
except ldap.NO_SUCH_OBJECT:
rs = []
if not rs:
log.warning('LdapUserPref: No user record found for: {}'.format(username))
return ''
user_dn, user_attrs = rs[0]
ldap_attr = self.fields[pref_name]
# assume single-valued list
return user_attrs[ldap_attr][0].decode('utf-8')
def set_pref(self, user, pref_name, pref_value):
if pref_name in self.fields:
con = ldap_conn()
ldap_attr = self.fields[pref_name]
[(ldap.MOD_REPLACE, ldap_attr, pref_value.encode('utf-8'))])
return LocalUserPreferencesProvider().set_pref(user, pref_name, pref_value)
class AdminExtension(object):
A base class for extending the admin areas in Allura.
After extending this, expose the app by adding an entry point in your
myadmin =
:ivar dict project_admin_controllers: Mapping of str (url component) to
Controllers. Can be implemented as a ``@property`` function. The str
url components will be mounted at /p/someproject/admin/ext/STR/ and will
invoke the Controller.
project_admin_controllers = {}
def update_project_sidebar_menu(self, sidebar_links):
Implement this function to modify the project sidebar.
Check `c.project` if you want to limit when this displays
(e.g. nbhd project, subproject, etc)
:param sidebar_links: project admin side bar links
:type sidebar_links: list of :class:``
:rtype: ``None``
class SiteAdminExtension(object):
A base class for extending the site admin area in Allura.
After extending this, expose the extension by adding an entry point in your
myext =
:ivar dict controllers: Mapping of str (url component) to
Controllers. Can be implemented as a ``@property`` function. The str
url components will be mounted at /nf/admin/STR/ and will
invoke the Controller.
controllers = {}
def update_sidebar_menu(self, sidebar_links):
Change the site admin sidebar by modifying ``sidebar_links``.
:param sidebar_links: site admin side bar links
:type sidebar_links: list of :class:``
:rtype: ``None``
class ImportIdConverter(object):
An interface to convert to and from import_id values for indexing,
searching, or displaying.
To provide a new converter, expose an entry point in
mysource =
Then in your .ini file, set import_id_converter=mysource
def get(cls):
converter = config.get('import_id_converter')
if converter:
return g.entry_points['allura.import_id_converter'][converter]()
return cls()
def simplify(self, import_id):
if hasattr(import_id, 'get'):
return import_id.get('source_id')
return None
def expand(self, source_id, app_instance):
import_id = {
'source_id': source_id,
import_id.update(app_instance.config.options.get('import_id', {}))
return import_id