blob: 26b058f3a691a78c73829e538899135bac3dd86d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime
from urlparse import urlparse
import json
from operator import itemgetter
import pkg_resources
from pylons import tmpl_context as c, app_globals as g
from pylons import request
from paste.deploy.converters import asbool, aslist
from tg import expose, redirect, flash, validate, config, jsonify
from tg.decorators import with_trailing_slash, without_trailing_slash
from webob import exc
from bson import ObjectId
from ming.orm.ormsession import ThreadLocalORMSession
from ming.odm import session
from allura.app import Application, DefaultAdminController, SitemapEntry
from allura.lib import helpers as h
from allura import version
from allura import model as M
from allura.lib.repository import RepositoryApp
from allura.lib.security import has_access, require_access
from allura.lib.widgets import form_fields as ffw
from allura.lib import exceptions as forge_exc
from allura.lib import plugin
from allura.controllers import BaseController
from allura.lib.decorators import require_post
from allura.tasks import export_tasks
from allura.lib.widgets.project_list import ProjectScreenshots
from . import widgets as aw
log = logging.getLogger(__name__)
class W:
markdown_editor = ffw.MarkdownEdit()
label_edit = ffw.LabelEdit()
mount_delete = ffw.Lightbox(name='mount_delete', trigger='a.mount_delete')
admin_modal = ffw.Lightbox(name='admin_modal', trigger='a.admin_modal')
install_modal = ffw.Lightbox(
name='install_modal', trigger='a.install_trig')
explain_export_modal = ffw.Lightbox(
name='explain_export', trigger='#why_export')
group_card = aw.GroupCard()
permission_card = aw.PermissionCard()
group_settings = aw.GroupSettings()
new_group_settings = aw.NewGroupSettings()
screenshot_admin = aw.ScreenshotAdmin()
screenshot_list = ProjectScreenshots(draggable=True)
metadata_admin = aw.MetadataAdmin()
audit = aw.AuditLog()
page_list = ffw.PageList()
class AdminApp(Application):
'''This is the admin app. It is pretty much required for
a functioning allura project.
'''
__version__ = version.__version__
_installable_tools = None
max_instances = 0
tool_label = 'admin'
icons = {
24: 'images/admin_24.png',
32: 'images/admin_32.png',
48: 'images/admin_48.png'
}
exportable = True
def __init__(self, project, config):
Application.__init__(self, project, config)
self.root = ProjectAdminController()
self.api_root = ProjectAdminRestController()
self.admin = AdminAppAdminController(self)
self.templates = pkg_resources.resource_filename(
'allura.ext.admin', 'templates')
self.sitemap = [SitemapEntry('Admin', '.')]
def is_visible_to(self, user):
'''Whether the user can view the app.'''
return has_access(c.project, 'create')(user=user)
@staticmethod
def installable_tools_for(project):
tools = []
for name, App in g.entry_points['tool'].iteritems():
cfg = M.AppConfig(project_id=project._id, tool_name=name)
app = App(project, cfg)
if app.installable:
tools.append(dict(name=name, app=App))
# prevent from saving temporary config to db
session(cfg).expunge(cfg)
tools.sort(key=lambda t: (t['app'].status_int(), t['app'].ordinal))
return [t for t in tools
if t['app'].status in project.allowed_tool_status]
@staticmethod
def exportable_tools_for(project):
tools = []
for tool in project.app_configs:
if project.app_instance(tool).exportable:
tools.append(tool)
return sorted(tools, key=lambda t: t.options.mount_point)
def main_menu(self):
'''Apps should provide their entries to be added to the main nav
:return: a list of :class:`SitemapEntries <allura.app.SitemapEntry>`
'''
return [SitemapEntry('Admin', '.')]
@h.exceptionless([], log)
def sidebar_menu(self):
links = []
admin_url = c.project.url() + 'admin/'
if c.project.is_nbhd_project:
links.append(SitemapEntry('Add Project', c.project.url()
+ 'add_project', ui_icon=g.icons['plus']))
nbhd_admin_url = c.project.neighborhood.url() + '_admin/'
links = links + [
SitemapEntry('Neighborhood'),
SitemapEntry('Overview', nbhd_admin_url + 'overview'),
SitemapEntry('Awards', nbhd_admin_url + 'accolades')]
else:
links += [SitemapEntry('Metadata', admin_url + 'overview'), ]
if c.project.neighborhood.name != "Users":
links += [
SitemapEntry('Screenshots', admin_url + 'screenshots'),
SitemapEntry('Categorization', admin_url + 'trove')
]
links.append(SitemapEntry('Tools', admin_url + 'tools'))
if asbool(config.get('bulk_export_enabled', True)):
links.append(SitemapEntry('Export', admin_url + 'export'))
if c.project.is_root and has_access(c.project, 'admin')():
links.append(
SitemapEntry('User Permissions', admin_url + 'groups/'))
if not c.project.is_root and has_access(c.project, 'admin')():
links.append(
SitemapEntry('Permissions', admin_url + 'permissions/'))
if len(c.project.neighborhood_invitations):
links.append(
SitemapEntry('Invitation(s)', admin_url + 'invitations'))
links.append(SitemapEntry('Audit Trail', admin_url + 'audit/'))
if c.project.is_nbhd_project:
links.append(SitemapEntry('Statistics', nbhd_admin_url + 'stats/'))
links.append(None)
links.append(SitemapEntry('Help', nbhd_admin_url + 'help/'))
for ep_name in sorted(g.entry_points['admin'].keys()):
admin_extension = g.entry_points['admin'][ep_name]
admin_extension().update_project_sidebar_menu(links)
return links
def admin_menu(self):
return []
def install(self, project):
pass
def bulk_export(self, f):
json.dump(self.project, f, cls=jsonify.GenericJSON, indent=2)
class AdminExtensionLookup(object):
@expose()
def _lookup(self, name, *remainder):
for ep_name in sorted(g.entry_points['admin'].keys()):
admin_extension = g.entry_points['admin'][ep_name]
controller = admin_extension().project_admin_controllers.get(name)
if controller:
return controller(), remainder
raise exc.HTTPNotFound, name
class ProjectAdminController(BaseController):
def _check_security(self):
require_access(c.project, 'admin')
def __init__(self):
self.permissions = PermissionsController()
self.groups = GroupsController()
self.audit = AuditController()
self.ext = AdminExtensionLookup()
@with_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_admin.html')
def index(self, **kw):
scm_tools = []
for tool in c.project.app_configs:
app = g.entry_points["tool"].get(tool.tool_name)
if app and issubclass(app, RepositoryApp):
scm_tools.append(tool)
return dict(scm_tools=scm_tools)
@without_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_invitations.html')
def invitations(self):
return dict()
@without_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_overview.html')
def overview(self, **kw):
c.markdown_editor = W.markdown_editor
c.metadata_admin = W.metadata_admin
c.explain_export_modal = W.explain_export_modal
show_export_control = asbool(config.get('show_export_control', False))
allow_project_delete = asbool(config.get('allow_project_delete', True))
explain_export_text = '''The purpose of this section is to determine whether your project is subject to the
provisions of the US Export Administration Regulations. You should consult section 734.4 and Supplement 2
to Part 734 for information on such items and the calculation of U.S. controlled content.
<a href="http://www.bis.doc.gov/encryption/default.htm" target="_blank">
http://www.bis.doc.gov/encryption/default.htm</a>'''
if 'us_export_contact' in config:
explain_export_text += \
'If you have additional questions, ' \
'please contact <a href="mailto:{contact}">{contact}</a>.'.format(contact=config['us_export_contact'])
return dict(show_export_control=show_export_control,
allow_project_delete=allow_project_delete,
explain_export_text=explain_export_text)
@without_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_screenshots.html')
def screenshots(self, **kw):
c.screenshot_admin = W.screenshot_admin
c.screenshot_list = W.screenshot_list
return dict()
@without_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_trove.html')
def trove(self):
c.label_edit = W.label_edit
base_troves = M.TroveCategory.query.find(
dict(trove_parent_id=0)).sort('fullname').all()
topic_trove = M.TroveCategory.query.get(
trove_parent_id=0, shortname='topic')
license_trove = M.TroveCategory.query.get(
trove_parent_id=0, shortname='license')
return dict(base_troves=base_troves, license_trove=license_trove, topic_trove=topic_trove)
@without_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_tools.html')
def tools(self, page=None, limit=200, **kw):
c.markdown_editor = W.markdown_editor
c.label_edit = W.label_edit
c.mount_delete = W.mount_delete
c.admin_modal = W.admin_modal
c.install_modal = W.install_modal
c.page_list = W.page_list
mounts = c.project.ordered_mounts()
total_mounts = len(mounts)
limit, page = h.paging_sanitizer(limit, page or total_mounts / int(limit), total_mounts)
start = page * limit
return dict(
page=page,
limit=limit,
total_mounts=total_mounts,
mounts=mounts[start:start + limit],
installable_tools=AdminApp.installable_tools_for(c.project),
roles=M.ProjectRole.query.find(
dict(project_id=c.project.root_project._id)).sort('_id').all(),
categories=M.ProjectCategory.query.find(dict(parent_id=None)).sort('label').all())
@expose()
@require_post()
def configure_tool_grouping(self, grouping_threshold='1', page=0, limit=200, **kw):
try:
grouping_threshold = int(grouping_threshold)
if grouping_threshold < 1:
raise ValueError('Invalid threshold')
c.project.set_tool_data(
'allura', grouping_threshold=grouping_threshold)
except ValueError:
flash('Invalid threshold', 'error')
redirect('tools?limit=%s&page=%s' % (limit, page))
@expose()
@require_post()
def update_labels(self, labels=None, **kw):
require_access(c.project, 'admin')
c.project.labels = labels.split(',')
M.AuditLog.log('updated labels')
redirect('trove')
@without_trailing_slash
@expose()
def clone(self,
repo_type=None, source_url=None,
mount_point=None, mount_label=None,
**kw):
require_access(c.project, 'admin')
if repo_type is None:
return (
'<form method="get">'
'<input name="repo_type" value="Git">'
'<input name="source_url">'
'<input type="submit">'
'</form>')
for ep in h.iter_entry_points('allura', repo_type):
break
if ep is None or source_url is None:
raise exc.HTTPNotFound
h.log_action(log, 'install tool').info(
'clone repo from %s', source_url,
meta=dict(tool_type=repo_type, mount_point=mount_point, mount_label=mount_label))
c.project.install_app(
repo_type,
mount_point=mount_point,
mount_label=mount_label,
init_from_url=source_url)
M.AuditLog.log('Create repo as clone')
redirect('tools')
@without_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_permissions.html')
def groups(self, **kw):
return dict()
@expose()
def _lookup(self, name, *remainder):
app = c.project.app_instance(name)
if app is None:
raise exc.HTTPNotFound, name
return app.admin, remainder
@expose()
@require_post()
@validate(W.metadata_admin, error_handler=overview)
def update(self, name=None,
short_description=None,
summary='',
icon=None,
category=None,
external_homepage='',
video_url='',
support_page='',
support_page_url='',
twitter_handle='',
facebook_page='',
removal='',
moved_to_url='',
export_controlled=False,
export_control_type=None,
tracking_id='',
**kw):
require_access(c.project, 'update')
if removal != c.project.removal:
M.AuditLog.log('change project removal status to %s', removal)
h.log_action(log, 'change project removal status').info('')
c.project.removal = removal
c.project.removal_changed_date = datetime.utcnow()
if 'delete_icon' in kw:
M.ProjectFile.query.remove(
dict(project_id=c.project._id, category='icon'))
M.AuditLog.log('remove project icon')
h.log_action(log, 'remove project icon').info('')
g.post_event('project_updated')
redirect('overview')
elif 'delete' in kw:
allow_project_delete = asbool(
config.get('allow_project_delete', True))
if allow_project_delete or not c.project.is_root:
M.AuditLog.log('delete project')
h.log_action(log, 'delete project').info('')
plugin.ProjectRegistrationProvider.get().delete_project(
c.project, c.user)
redirect('overview')
elif 'undelete' in kw:
h.log_action(log, 'undelete project').info('')
M.AuditLog.log('undelete project')
plugin.ProjectRegistrationProvider.get().undelete_project(
c.project, c.user)
redirect('overview')
if name and name != c.project.name:
h.log_action(log, 'change project name').info('')
M.AuditLog.log('change project name to %s', name)
c.project.name = name
if short_description != c.project.short_description:
h.log_action(log, 'change project short description').info('')
M.AuditLog.log('change short description to %s', short_description)
c.project.short_description = short_description
if summary != c.project.summary:
h.log_action(log, 'change project summary').info('')
M.AuditLog.log('change summary to %s', summary)
c.project.summary = summary
category = category and ObjectId(category) or None
if category != c.project.category_id:
h.log_action(log, 'change project category').info('')
M.AuditLog.log('change category to %s', category)
c.project.category_id = category
if external_homepage != c.project.external_homepage:
h.log_action(log, 'change external home page').info('')
M.AuditLog.log('change external home page to %s',
external_homepage)
c.project.external_homepage = external_homepage
if video_url != c.project.video_url:
h.log_action(log, 'change video url').info('')
M.AuditLog.log('change video url to %s', video_url)
c.project.video_url = video_url
if support_page != c.project.support_page:
h.log_action(log, 'change project support page').info('')
M.AuditLog.log('change project support page to %s', support_page)
c.project.support_page = support_page
old_twitter = c.project.social_account('Twitter')
if not old_twitter or twitter_handle != old_twitter.accounturl:
h.log_action(log, 'change project twitter handle').info('')
M.AuditLog.log('change project twitter handle to %s',
twitter_handle)
c.project.set_social_account('Twitter', twitter_handle)
old_facebook = c.project.social_account('Facebook')
if not old_facebook or facebook_page != old_facebook.accounturl:
if not facebook_page or 'facebook.com' in urlparse(facebook_page).netloc:
h.log_action(log, 'change project facebook page').info('')
M.AuditLog.log(
'change project facebook page to %s', facebook_page)
c.project.set_social_account('Facebook', facebook_page)
if support_page_url != c.project.support_page_url:
h.log_action(log, 'change project support page url').info('')
M.AuditLog.log('change project support page url to %s',
support_page_url)
c.project.support_page_url = support_page_url
if moved_to_url != c.project.moved_to_url:
h.log_action(log, 'change project moved to url').info('')
M.AuditLog.log('change project moved to url to %s', moved_to_url)
c.project.moved_to_url = moved_to_url
export_controlled = asbool(export_controlled)
if export_controlled != c.project.export_controlled:
h.log_action(
log, 'change project export controlled status').info('')
M.AuditLog.log(
'change project export controlled status to %s', export_controlled)
c.project.export_controlled = export_controlled
if not export_controlled:
export_control_type = None
if export_control_type != c.project.export_control_type:
h.log_action(log, 'change project export control type').info('')
M.AuditLog.log('change project export control type to %s',
export_control_type)
c.project.export_control_type = export_control_type
if tracking_id != c.project.tracking_id:
h.log_action(log, 'change project tracking ID').info('')
M.AuditLog.log('change project tracking ID to %s', tracking_id)
c.project.tracking_id = tracking_id
if icon is not None and icon != '':
if c.project.icon:
M.ProjectFile.remove(
dict(project_id=c.project._id, category='icon'))
M.AuditLog.log('update project icon')
M.ProjectFile.save_image(
icon.filename, icon.file, content_type=icon.type,
square=True, thumbnail_size=(48, 48),
thumbnail_meta=dict(project_id=c.project._id, category='icon'))
g.post_event('project_updated')
flash('Saved', 'success')
redirect('overview')
def _add_trove(self, type, new_trove):
current_troves = getattr(c.project, 'trove_%s' % type)
trove_obj = M.TroveCategory.query.get(trove_cat_id=int(new_trove))
error_msg = None
if type in ['license', 'audience', 'developmentstatus', 'language'] and len(current_troves) >= 6:
error_msg = 'You may not have more than 6 of this category.'
elif type in ['topic'] and len(current_troves) >= 3:
error_msg = 'You may not have more than 3 of this category.'
elif trove_obj is not None:
if trove_obj._id not in current_troves:
current_troves.append(trove_obj._id)
M.AuditLog.log('add trove %s: %s', type, trove_obj.fullpath)
# just in case the event handling is super fast
ThreadLocalORMSession.flush_all()
c.project.last_updated = datetime.utcnow()
g.post_event('project_updated')
else:
error_msg = 'This category has already been assigned to the project.'
return (trove_obj, error_msg)
@expose('json:')
@require_post()
def add_trove_js(self, type, new_trove, **kw):
require_access(c.project, 'update')
trove_obj, error_msg = self._add_trove(type, new_trove)
return dict(trove_full_path=trove_obj.fullpath, trove_cat_id=trove_obj.trove_cat_id, error_msg=error_msg)
@expose()
@require_post()
def add_trove(self, type, new_trove, **kw):
require_access(c.project, 'update')
trove_obj, error_msg = self._add_trove(type, new_trove)
if error_msg:
flash(error_msg, 'error')
redirect('trove')
@expose()
@require_post()
def delete_trove(self, type, trove, **kw):
require_access(c.project, 'update')
trove_obj = M.TroveCategory.query.get(trove_cat_id=int(trove))
current_troves = getattr(c.project, 'trove_%s' % type)
if trove_obj is not None and trove_obj._id in current_troves:
M.AuditLog.log('remove trove %s: %s', type, trove_obj.fullpath)
current_troves.remove(trove_obj._id)
# just in case the event handling is super fast
ThreadLocalORMSession.flush_all()
c.project.last_updated = datetime.utcnow()
g.post_event('project_updated')
redirect('trove')
@expose()
@require_post()
@validate(W.screenshot_admin)
def add_screenshot(self, screenshot=None, caption=None, **kw):
require_access(c.project, 'update')
screenshots = c.project.get_screenshots()
if len(screenshots) >= 6:
flash('You may not have more than 6 screenshots per project.',
'error')
elif screenshot is not None and screenshot != '':
M.AuditLog.log('add screenshot')
sort = 1 + max([ss.sort or 0 for ss in screenshots] or [0])
M.ProjectFile.save_image(
screenshot.filename, screenshot.file, content_type=screenshot.type,
save_original=True,
original_meta=dict(
project_id=c.project._id,
category='screenshot',
caption=caption,
sort=sort),
square=True, thumbnail_size=(150, 150),
thumbnail_meta=dict(project_id=c.project._id, category='screenshot_thumb'))
g.post_event('project_updated')
redirect('screenshots')
@expose()
@require_post()
def sort_screenshots(self, **kw):
"""Sort project screenshots.
Called via ajax when screenshots are reordered via drag/drop on
the Screenshots admin page.
``kw`` is a mapping of (screenshot._id, sort_order) pairs.
"""
for s in c.project.get_screenshots():
if str(s._id) in kw:
s.sort = int(kw[str(s._id)])
g.post_event('project_updated')
@expose()
@require_post()
def delete_screenshot(self, id=None, **kw):
require_access(c.project, 'update')
if id is not None and id != '':
M.AuditLog.log('remove screenshot')
M.ProjectFile.query.remove(
dict(project_id=c.project._id, _id=ObjectId(id)))
g.post_event('project_updated')
redirect('screenshots')
@expose()
@require_post()
def edit_screenshot(self, id=None, caption=None, **kw):
require_access(c.project, 'update')
if id is not None and id != '':
M.ProjectFile.query.get(
project_id=c.project._id, _id=ObjectId(id)).caption = caption
g.post_event('project_updated')
redirect('screenshots')
@expose()
@require_post()
def join_neighborhood(self, nid):
require_access(c.project, 'admin')
if not nid:
n = M.Neighborhood.query.get(name='Projects')
c.project.neighborhood_id = n._id
flash('Joined %s' % n.name)
redirect(c.project.url() + 'admin/')
nid = ObjectId(str(nid))
if nid not in c.project.neighborhood_invitations:
flash('No invitation to that neighborhood', 'error')
redirect('.')
c.project.neighborhood_id = nid
n = M.Neighborhood.query.get(_id=nid)
flash('Joined %s' % n.name)
redirect('invitations')
@h.vardec
@expose()
@require_post()
def update_mount_order(self, subs=None, tools=None, **kw):
if subs:
for sp in subs:
p = M.Project.query.get(shortname=sp['shortname'],
neighborhood_id=c.project.neighborhood_id)
p.ordinal = int(sp['ordinal'])
if tools:
for p in tools:
c.project.app_config(
p['mount_point']).options.ordinal = int(p['ordinal'])
redirect('tools')
def _update_mounts(self, subproject=None, tool=None, new=None, **kw):
if subproject is None:
subproject = []
if tool is None:
tool = []
for sp in subproject:
p = M.Project.query.get(shortname=sp['shortname'],
neighborhood_id=c.project.neighborhood_id)
if sp.get('delete'):
require_access(c.project, 'admin')
M.AuditLog.log('delete subproject %s', sp['shortname'])
h.log_action(log, 'delete subproject').info(
'delete subproject %s', sp['shortname'],
meta=dict(name=sp['shortname']))
p.removal = 'deleted'
plugin.ProjectRegistrationProvider.get().delete_project(
p, c.user)
elif not new:
M.AuditLog.log('update subproject %s', sp['shortname'])
p.name = sp['name']
p.ordinal = int(sp['ordinal'])
for p in tool:
if p.get('delete'):
require_access(c.project, 'admin')
M.AuditLog.log('uninstall tool %s', p['mount_point'])
h.log_action(log, 'uninstall tool').info(
'uninstall tool %s', p['mount_point'],
meta=dict(mount_point=p['mount_point']))
c.project.uninstall_app(p['mount_point'])
elif not new:
M.AuditLog.log('update tool %s', p['mount_point'])
options = c.project.app_config(p['mount_point']).options
options.mount_label = p['mount_label']
options.ordinal = int(p['ordinal'])
if new and new.get('install'):
ep_name = new.get('ep_name', None)
if not ep_name:
require_access(c.project, 'create')
mount_point = new['mount_point'].lower() or h.nonce()
M.AuditLog.log('create subproject %s', mount_point)
h.log_action(log, 'create subproject').info(
'create subproject %s', mount_point,
meta=dict(mount_point=mount_point, name=new['mount_label']))
sp = c.project.new_subproject(mount_point)
sp.name = new['mount_label']
sp.ordinal = int(new['ordinal'])
else:
require_access(c.project, 'admin')
installable_tools = AdminApp.installable_tools_for(c.project)
if not ep_name.lower() in [t['name'].lower() for t in installable_tools]:
flash('Installation limit exceeded.', 'error')
return
mount_point = new['mount_point'] or ep_name
M.AuditLog.log('install tool %s', mount_point)
h.log_action(log, 'install tool').info(
'install tool %s', mount_point,
meta=dict(tool_type=ep_name, mount_point=mount_point, mount_label=new['mount_label']))
return c.project.install_app(
ep_name, mount_point, mount_label=new['mount_label'], ordinal=new['ordinal'])
g.post_event('project_updated')
@h.vardec
@expose()
@require_post()
def update_mounts(self, subproject=None, tool=None, new=None, page=0, limit=200, **kw):
try:
new_app = self._update_mounts(subproject, tool, new, **kw)
if new_app:
# force redir to last page of tools, where new app will be
page = ''
except forge_exc.ForgeError, exc:
flash('%s: %s' % (exc.__class__.__name__, exc.args[0]),
'error')
redirect('tools?limit=%s&page=%s' % (limit, page))
@expose('jinja:allura.ext.admin:templates/export.html')
def export(self, tools=None):
if not asbool(config.get('bulk_export_enabled', True)):
raise exc.HTTPNotFound()
if request.method == 'POST':
try:
ProjectAdminRestController().export(tools, send_email=True)
except (exc.HTTPBadRequest, exc.HTTPServiceUnavailable) as e:
flash(str(e), 'error')
redirect('.')
else:
flash(
'Export scheduled. You will recieve an email with download instructions when complete.', 'ok')
redirect('export')
exportable_tools = AdminApp.exportable_tools_for(c.project)
return {
'tools': exportable_tools,
'status': c.project.bulk_export_status()
}
class ProjectAdminRestController(BaseController):
"""
Exposes RESTful APi for project admin actions.
"""
def _check_security(self):
require_access(c.project, 'admin')
@expose('json:')
@require_post()
def export(self, tools=None, send_email=False, **kw):
"""
Initiate a bulk export of the project data.
Must be given a list of tool mount points to include in the export.
The list can either be comma-separated or a repeated param, e.g.,
`export?tools=tickets&tools=discussion`.
If the tools are not provided, an invalid mount point is listed, or
there is some other problems with the arguments, a `400 Bad Request`
response will be returned.
If an export is already currently running for this project, a
`503 Unavailable` response will be returned.
Otherwise, a JSON object of the form
`{"status": "in progress", "filename": FILENAME}` will be returned,
where `FILENAME` is the filename of the export artifact relative to
the users shell account directory.
"""
if not asbool(config.get('bulk_export_enabled', True)):
raise exc.HTTPNotFound()
if not tools:
raise exc.HTTPBadRequest(
'Must give at least one tool mount point to export')
tools = aslist(tools, ',')
exportable_tools = AdminApp.exportable_tools_for(c.project)
allowed = set(t.options.mount_point for t in exportable_tools)
if not set(tools).issubset(allowed):
raise exc.HTTPBadRequest('Invalid tool')
if c.project.bulk_export_status() == 'busy':
raise exc.HTTPServiceUnavailable(
'Export for project %s already running' % c.project.shortname)
# filename (potentially) includes a timestamp, so we have
# to pre-generate to be able to return it to the user
filename = c.project.bulk_export_filename()
export_tasks.bulk_export.post(tools, filename, send_email=send_email)
return {
'status': 'in progress',
'filename': filename,
}
@expose('json:')
def export_status(self, **kw):
"""
Check the status of a bulk export.
Returns an object containing only one key, `status`, whose value is
either `'busy'` or `'ready'`.
"""
status = c.project.bulk_export_status()
return {'status': status or 'ready'}
@expose('json:')
@require_post()
def install_tool(self, tool=None, mount_point=None, mount_label=None, order=None, **kw):
"""API for installing tools in current project.
Requires a valid tool, mount point and mount label names.
(All arguments are required.)
Usage example::
POST to:
/rest/p/testproject/admin/install_tool/
with params:
{
'tool': 'tickets',
'mount_point': 'mountpoint',
'mount_label': 'mountlabel',
'order': 'first|last|alpha_tool'
}
Example output (in successful case)::
{
"info": "Tool tickets with mount_point mountpoint and mount_label mountlabel was created.",
"success": true
}
"""
controller = ProjectAdminController()
if not tool or not mount_point or not mount_label:
return {
'success': False,
'info': 'All arguments required.'
}
installable_tools = AdminApp.installable_tools_for(c.project)
tools_names = [t['name'] for t in installable_tools]
if not (tool in tools_names):
return {
'success': False,
'info': 'Incorrect tool name, or limit is reached.'
}
if c.project.app_instance(mount_point) is not None:
return {
'success': False,
'info': 'Mount point already exists.',
}
if order is None:
order = 'last'
mounts = [{'ordinal': ac.options.ordinal,
'label': ac.options.mount_label,
'mount': ac.options.mount_point,
'type': ac.tool_name.lower()}
for ac in c.project.app_configs]
subs = {p.shortname: p for p in M.Project.query.find({'parent_id': c.project._id})}
for sub in subs.values():
mounts.append({'ordinal': sub.ordinal,
'mount': sub.shortname,
'type': 'sub-project'})
mounts.sort(key=itemgetter('ordinal'))
if order == 'first':
ordinal = 0
elif order == 'last':
ordinal = len(mounts)
elif order == 'alpha_tool':
tool = tool.lower()
for i, mount in enumerate(mounts):
if mount['type'] == tool and mount['label'] > mount_label:
ordinal = i
break
else:
ordinal = len(mounts)
mounts.insert(ordinal, {'ordinal': ordinal, 'type': 'new'})
for i, mount in enumerate(mounts):
if mount['type'] == 'new':
pass
elif mount['type'] == 'sub-project':
subs[mount['mount']].ordinal = i
else:
c.project.app_config(mount['mount']).options.ordinal = i
data = {
'install': 'install',
'ep_name': tool,
'ordinal': ordinal,
'mount_point': mount_point,
'mount_label': mount_label
}
try:
controller._update_mounts(new=data)
except forge_exc.ForgeError as e:
return {
'success': False,
'info': str(e),
}
return {
'success': True,
'info': 'Tool %s with mount_point %s and mount_label %s was created.'
% (tool, mount_point, mount_label)
}
class PermissionsController(BaseController):
def _check_security(self):
require_access(c.project, 'admin')
@with_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_permissions.html')
def index(self, **kw):
c.card = W.permission_card
return dict(permissions=self._index_permissions())
@without_trailing_slash
@expose()
@h.vardec
@require_post()
def update(self, card=None, **kw):
permissions = self._index_permissions()
old_permissions = dict(permissions)
for args in card:
perm = args['id']
new_group_ids = args.get('new', [])
group_ids = args.get('value', [])
if isinstance(new_group_ids, basestring):
new_group_ids = [new_group_ids]
if isinstance(group_ids, basestring):
group_ids = [group_ids]
# make sure the admin group has the admin permission
if perm == 'admin':
if c.project.is_root:
pid = c.project._id
else:
pid = c.project.parent_id
admin_group_id = str(
M.ProjectRole.query.get(project_id=pid, name='Admin')._id)
if admin_group_id not in group_ids + new_group_ids:
flash(
'You cannot remove the admin group from the admin permission.', 'warning')
group_ids.append(admin_group_id)
permissions[perm] = []
role_ids = map(ObjectId, group_ids + new_group_ids)
permissions[perm] = role_ids
c.project.acl = []
for perm, role_ids in permissions.iteritems():
role_names = lambda ids: ','.join(sorted(
pr.name for pr in M.ProjectRole.query.find(dict(_id={'$in': ids}))))
old_role_ids = old_permissions.get(perm, [])
if old_role_ids != role_ids:
M.AuditLog.log('updated "%s" permissions: "%s" => "%s"',
perm, role_names(old_role_ids), role_names(role_ids))
c.project.acl += [M.ACE.allow(rid, perm) for rid in role_ids]
g.post_event('project_updated')
redirect('.')
def _index_permissions(self):
permissions = dict(
(p, []) for p in c.project.permissions)
for ace in c.project.acl:
if ace.access == M.ACE.ALLOW:
permissions[ace.permission].append(ace.role_id)
return permissions
class GroupsController(BaseController):
def _check_security(self):
require_access(c.project, 'admin')
def _index_permissions(self):
permissions = dict(
(p, []) for p in c.project.permissions)
for ace in c.project.acl:
if ace.access == M.ACE.ALLOW:
permissions[ace.permission].append(ace.role_id)
return permissions
def _map_group_permissions(self):
roles = c.project.named_roles
permissions = self._index_permissions()
permissions_by_role = dict()
auth_role = M.ProjectRole.authenticated()
anon_role = M.ProjectRole.anonymous()
for role in roles + [auth_role, anon_role]:
permissions_by_role[str(role._id)] = []
for perm in permissions:
perm_info = dict(has="no", text="Does not have permission %s" %
perm, name=perm)
role_ids = permissions[perm]
if role._id in role_ids:
perm_info['text'] = "Has permission %s" % perm
perm_info['has'] = "yes"
else:
for r in role.child_roles():
if r._id in role_ids:
perm_info['text'] = "Inherited permission %s from %s" % (
perm, r.name)
perm_info['has'] = "inherit"
break
if perm_info['has'] == "no":
if anon_role._id in role_ids:
perm_info[
'text'] = "Inherited permission %s from Anonymous" % perm
perm_info['has'] = "inherit"
elif auth_role._id in role_ids and role != anon_role:
perm_info[
'text'] = "Inherited permission %s from Authenticated" % perm
perm_info['has'] = "inherit"
permissions_by_role[str(role._id)].append(perm_info)
return permissions_by_role
@without_trailing_slash
@expose()
@h.vardec
def delete_group(self, group_name, **kw):
role = M.ProjectRole.by_name(group_name)
if not role:
flash('Group "%s" does not exist.' % group_name, 'error')
else:
role.delete()
M.AuditLog.log('delete group %s', group_name)
flash('Group "%s" deleted successfully.' % group_name)
g.post_event('project_updated')
redirect('.')
@with_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_groups.html')
def index(self, **kw):
c.admin_modal = W.admin_modal
c.card = W.group_card
permissions_by_role = self._map_group_permissions()
auth_role = M.ProjectRole.authenticated()
anon_role = M.ProjectRole.anonymous()
roles = c.project.named_roles
roles.append(None)
return dict(roles=roles, permissions_by_role=permissions_by_role,
auth_role=auth_role, anon_role=anon_role)
@without_trailing_slash
@expose('json:')
@require_post()
@h.vardec
def change_perm(self, role_id, permission, allow="true", **kw):
if allow == "true":
M.AuditLog.log('granted permission %s to group %s', permission,
M.ProjectRole.query.get(_id=ObjectId(role_id)).name)
c.project.acl.append(M.ACE.allow(ObjectId(role_id), permission))
else:
admin_group_id = str(M.ProjectRole.by_name('Admin')._id)
if admin_group_id == role_id and permission == 'admin':
return dict(error='You cannot remove the admin permission from the admin group.')
M.AuditLog.log('revoked permission %s from group %s', permission,
M.ProjectRole.query.get(_id=ObjectId(role_id)).name)
c.project.acl.remove(M.ACE.allow(ObjectId(role_id), permission))
g.post_event('project_updated')
return self._map_group_permissions()
@without_trailing_slash
@expose('json:')
@require_post()
@h.vardec
def add_user(self, role_id, username, **kw):
if not username or username == '*anonymous':
return dict(error='You must choose a user to add.')
group = M.ProjectRole.query.get(_id=ObjectId(role_id))
user = M.User.by_username(username.strip())
if not group:
return dict(error='Could not find group with id %s' % role_id)
if not user:
return dict(error='User %s not found' % username)
user_role = M.ProjectRole.by_user(user, upsert=True)
if group._id in user_role.roles:
return dict(error='%s (%s) is already in the group %s.' % (user.display_name, username, group.name))
M.AuditLog.log('add user %s to %s', username, group.name)
user_role.roles.append(group._id)
if group.name == 'Admin':
for ac in c.project.app_configs:
c.project.app_instance(ac).subscribe(user)
g.post_event('project_updated')
return dict(username=username, displayname=user.display_name)
@without_trailing_slash
@expose('json:')
@require_post()
@h.vardec
def remove_user(self, role_id, username, **kw):
group = M.ProjectRole.query.get(_id=ObjectId(role_id))
user = M.User.by_username(username.strip())
if group.name == 'Admin' and len(group.users_with_role()) == 1:
return dict(error='You must have at least one user with the Admin role.')
if not group:
return dict(error='Could not find group with id %s' % role_id)
if not user:
return dict(error='User %s not found' % username)
user_role = M.ProjectRole.by_user(user)
if not user_role or group._id not in user_role.roles:
return dict(error='%s (%s) is not in the group %s.' % (user.display_name, username, group.name))
M.AuditLog.log('remove user %s from %s', username, group.name)
user_role.roles.remove(group._id)
g.post_event('project_updated')
return dict()
@without_trailing_slash
@expose()
@require_post()
@h.vardec
def update(self, card=None, **kw):
for pr in card:
group = M.ProjectRole.query.get(_id=ObjectId(pr['id']))
assert group.project == c.project, 'Security violation'
user_ids = pr.get('value', [])
new_users = pr.get('new', [])
if isinstance(user_ids, basestring):
user_ids = [user_ids]
if isinstance(new_users, basestring):
new_users = [new_users]
# Handle new users in groups
user_added = False
for username in new_users:
user = M.User.by_username(username.strip())
if not user:
flash('User %s not found' % username, 'error')
redirect('.')
if not user._id:
continue # never add anon users to groups
M.AuditLog.log('add user %s to %s', username, group.name)
M.ProjectRole.by_user(
user, upsert=True).roles.append(group._id)
user_added = True
# Make sure we aren't removing all users from the Admin group
if group.name == u'Admin' and not (user_ids or user_added):
flash('You must have at least one user with the Admin role.',
'warning')
redirect('.')
# Handle users removed from groups
user_ids = set(
uid and ObjectId(uid)
for uid in user_ids)
for role in M.ProjectRole.query.find(dict(user_id={'$ne': None}, roles=group._id)):
if role.user_id and role.user_id not in user_ids:
role.roles = [
rid for rid in role.roles if rid != group._id]
M.AuditLog.log('remove user %s from %s',
role.user.username, group.name)
g.post_event('project_updated')
redirect('.')
@without_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_group.html')
def new(self):
c.form = W.new_group_settings
return dict(
group=None,
show_settings=True,
action="create")
@expose()
@require_post()
@validate(W.new_group_settings)
@h.vardec
def create(self, name=None, **kw):
if M.ProjectRole.by_name(name):
flash('%s already exists' % name, 'error')
else:
M.ProjectRole(project_id=c.project._id, name=name)
M.AuditLog.log('create group %s', name)
g.post_event('project_updated')
redirect('.')
@expose()
def _lookup(self, name, *remainder):
return GroupController(name), remainder
class GroupController(BaseController):
def __init__(self, name):
self._group = M.ProjectRole.query.get(_id=ObjectId(name))
@with_trailing_slash
@expose('jinja:allura.ext.admin:templates/project_group.html')
def index(self):
if self._group.name in ('Admin', 'Developer', 'Member'):
show_settings = False
action = None
else:
show_settings = True
action = self._group.settings_href + 'update'
c.form = W.group_settings
return dict(
group=self._group,
show_settings=show_settings,
action=action)
@expose()
@h.vardec
@require_post()
@validate(W.group_settings)
def update(self, _id=None, delete=None, name=None, **kw):
pr = M.ProjectRole.by_name(name)
if pr and pr._id != _id._id:
flash('%s already exists' % name, 'error')
redirect('..')
if delete:
_id.delete()
M.AuditLog.log('delete group %s', _id.name)
flash('%s deleted' % name)
redirect('..')
M.AuditLog.log('update group name %s=>%s', _id.name, name)
_id.name = name
flash('%s updated' % name)
redirect('..')
class AuditController(BaseController):
@with_trailing_slash
@expose('jinja:allura.ext.admin:templates/audit.html')
def index(self, limit=25, page=0, **kwargs):
limit = int(limit)
page = int(page)
count = M.AuditLog.query.find(dict(project_id=c.project._id)).count()
q = M.AuditLog.query.find(dict(project_id=c.project._id))
q = q.sort('timestamp', -1)
q = q.skip(page * limit)
if count > limit:
q = q.limit(limit)
else:
limit = count
c.widget = W.audit
return dict(
entries=q.all(),
limit=limit,
page=page,
count=count)
class AdminAppAdminController(DefaultAdminController):
'''Administer the admin app'''
pass