| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import re |
| import logging |
| from datetime import datetime, timedelta |
| import pipes |
| |
| from tg import expose, validate, flash, config, redirect |
| from tg.decorators import with_trailing_slash, without_trailing_slash |
| import bson |
| import tg |
| from paste.deploy.converters import aslist |
| from tg import app_globals as g |
| from tg import tmpl_context as c |
| from tg import request |
| from formencode import validators, Invalid |
| from webob.exc import HTTPNotFound, HTTPFound |
| from ming.odm import ThreadLocalODMSession |
| import paginate |
| |
| from allura.app import SitemapEntry |
| from allura.lib import helpers as h |
| from allura.lib import validators as v |
| from allura.lib.decorators import require_post |
| from allura.lib.plugin import SiteAdminExtension, ProjectRegistrationProvider, AuthenticationProvider |
| from allura.lib import search |
| from allura.lib.security import require_site_admin, Credentials |
| from allura.lib.widgets import form_fields as ffw |
| from allura.ext.admin.widgets import AuditLog |
| from allura.lib.widgets import forms |
| from allura import model as M |
| from allura.command.show_models import dfs, build_model_inheritance_graph |
| from allura.scripts.delete_projects import DeleteProjects |
| import allura |
| |
| from urllib.parse import urlparse |
| import six |
| |
| |
| log = logging.getLogger(__name__) |
| |
| |
| class W: |
| page_list = ffw.PageList() |
| page_size = ffw.PageSize() |
| audit = AuditLog() |
| admin_search_form = forms.AdminSearchForm |
| |
| |
| class SiteAdminController: |
| |
| def __init__(self): |
| self.task_manager = TaskManagerController() |
| self.user = AdminUserDetailsController() |
| self.delete_projects = DeleteProjectsController() |
| self.site_notifications = SiteNotificationController() |
| |
| def _check_security(self): |
| require_site_admin(c.user) |
| |
| c.site_admin_sidebar_menu = self.sidebar_menu() |
| |
| @expose() |
| def _lookup(self, name, *remainder): |
| for ep_name in sorted(g.entry_points['site_admin'].keys()): |
| admin_extension = g.entry_points['site_admin'][ep_name] |
| controller = admin_extension().controllers.get(name) |
| if controller: |
| return controller(), remainder |
| raise HTTPNotFound(name) |
| |
| def sidebar_menu(self): |
| base_url = '/nf/admin/' |
| links = [ |
| SitemapEntry('Home', base_url, ui_icon=g.icons['admin']), |
| SitemapEntry('Add Subscribers', base_url + 'add_subscribers', ui_icon=g.icons['admin']), |
| SitemapEntry('New Projects', base_url + 'new_projects', ui_icon=g.icons['admin']), |
| SitemapEntry('Reclone Repo', base_url + 'reclone_repo', ui_icon=g.icons['admin']), |
| SitemapEntry('Task Manager', base_url + 'task_manager?state=busy', ui_icon=g.icons['stats']), |
| SitemapEntry('Search Projects', base_url + 'search_projects', ui_icon=g.icons['search']), |
| SitemapEntry('Delete Projects', base_url + 'delete_projects', ui_icon=g.icons['delete']), |
| SitemapEntry('Search Users', base_url + 'search_users', ui_icon=g.icons['search']), |
| SitemapEntry('Site Notifications', base_url + 'site_notifications', ui_icon=g.icons['admin']), |
| ] |
| for ep_name in sorted(g.entry_points['site_admin']): |
| g.entry_points['site_admin'][ep_name]().update_sidebar_menu(links) |
| return links |
| |
| @expose('jinja:allura:templates/site_admin_index.html') |
| @with_trailing_slash |
| def index(self, **kw): |
| return {} |
| |
| def subscribe_artifact(self, url, user): |
| artifact_url = urlparse(url).path[1:-1].split("/") |
| neighborhood = M.Neighborhood.query.find({ |
| "url_prefix": "/" + artifact_url[0] + "/"}).first() |
| |
| if artifact_url[0] == "u": |
| project = M.Project.query.find({ |
| "shortname": artifact_url[0] + "/" + artifact_url[1], |
| "neighborhood_id": neighborhood._id}).first() |
| else: |
| project = M.Project.query.find({ |
| "shortname": artifact_url[1], |
| "neighborhood_id": neighborhood._id}).first() |
| |
| appconf = M.AppConfig.query.find({ |
| "options.mount_point": artifact_url[2], |
| "project_id": project._id}).first() |
| |
| if appconf.url() == urlparse(url).path: |
| M.Mailbox.subscribe( |
| user_id=user._id, |
| app_config_id=appconf._id, |
| project_id=project._id) |
| return True |
| |
| tool_packages = h.get_tool_packages(appconf.tool_name) |
| classes = set() |
| for depth, cls in dfs(M.Artifact, build_model_inheritance_graph()): |
| for pkg in tool_packages: |
| if cls.__module__.startswith(pkg + '.'): |
| classes.add(cls) |
| for cls in classes: |
| for artifact in cls.query.find({"app_config_id": appconf._id}): |
| if artifact.url() == urlparse(url).path: |
| M.Mailbox.subscribe( |
| user_id=user._id, |
| app_config_id=appconf._id, |
| project_id=project._id, |
| artifact=artifact) |
| return True |
| return False |
| |
| @expose('jinja:allura:templates/site_admin_add_subscribers.html') |
| @without_trailing_slash |
| def add_subscribers(self, **data): |
| if request.method == 'POST': |
| url = data['artifact_url'] |
| user = M.User.by_username(data['for_user']) |
| if not user or user == M.User.anonymous(): |
| flash('Invalid login', 'error') |
| return data |
| |
| try: |
| ok = self.subscribe_artifact(url, user) |
| except Exception: |
| log.warning("Can't subscribe to artifact", exc_info=True) |
| ok = False |
| |
| if ok: |
| flash('User successfully subscribed to the artifact') |
| return {} |
| else: |
| flash('Artifact not found', 'error') |
| |
| return data |
| |
| @expose('jinja:allura:templates/site_admin_new_projects.html') |
| @without_trailing_slash |
| def new_projects(self, **kwargs): |
| start_dt = kwargs.pop('start-dt', '') |
| end_dt = kwargs.pop('end-dt', '') |
| try: |
| start_dt = datetime.strptime(start_dt, '%Y/%m/%d %H:%M:%S') |
| except ValueError: |
| start_dt = datetime.utcnow() + timedelta(days=1) |
| try: |
| end_dt = datetime.strptime(end_dt, '%Y/%m/%d %H:%M:%S') |
| except ValueError: |
| end_dt = end_dt if end_dt else start_dt - timedelta(days=3) |
| start = bson.ObjectId.from_datetime(start_dt) |
| end = bson.ObjectId.from_datetime(end_dt) |
| nb = M.Neighborhood.query.get(name='Users') |
| projects = (M.Project.query.find({ |
| 'neighborhood_id': {'$ne': nb._id}, |
| 'deleted': False, |
| '_id': {'$lt': start, '$gt': end}, |
| }).sort('_id', -1)).all() |
| # pre-populate roles cache, so we won't query mongo for roles for every project |
| # when getting admins with p.admins() in a template |
| Credentials.get().load_project_roles(*[p._id for p in projects]) |
| step = start_dt - end_dt |
| params = request.params.copy() |
| params['start-dt'] = (start_dt + step).strftime('%Y/%m/%d %H:%M:%S') |
| params['end-dt'] = (end_dt + step).strftime('%Y/%m/%d %H:%M:%S') |
| newer_url = tg.url(params=params).lstrip('/') |
| params['start-dt'] = (start_dt - step).strftime('%Y/%m/%d %H:%M:%S') |
| params['end-dt'] = (end_dt - step).strftime('%Y/%m/%d %H:%M:%S') |
| older_url = tg.url(params=params).lstrip('/') |
| return { |
| 'projects': projects, |
| 'newer_url': newer_url, |
| 'older_url': older_url, |
| 'window_start': start_dt, |
| 'window_end': end_dt, |
| } |
| |
| @expose('jinja:allura:templates/site_admin_reclone_repo.html') |
| @without_trailing_slash |
| @validate(dict(prefix=validators.NotEmpty(), |
| shortname=validators.NotEmpty(), |
| mount_point=validators.NotEmpty())) |
| def reclone_repo(self, prefix=None, shortname=None, mount_point=None, **data): |
| if request.method == 'POST': |
| if request.validation.errors: |
| error_msg = 'Error: ' |
| for msg in list(request.validation.errors): |
| names = {'prefix': 'Neighborhood prefix', 'shortname': |
| 'Project shortname', 'mount_point': 'Repository mount point'} |
| error_msg += f'{names[msg]}: {request.validation.errors[msg]} ' |
| flash(error_msg, 'error') |
| return dict(prefix=prefix, shortname=shortname, mount_point=mount_point) |
| nbhd = M.Neighborhood.query.get(url_prefix='/%s/' % prefix) |
| if not nbhd: |
| flash('Neighborhood with prefix %s not found' % |
| prefix, 'error') |
| return dict(prefix=prefix, shortname=shortname, mount_point=mount_point) |
| c.project = M.Project.query.get( |
| shortname=shortname, neighborhood_id=nbhd._id) |
| if not c.project: |
| flash( |
| 'Project with shortname %s not found in neighborhood %s' % |
| (shortname, nbhd.name), 'error') |
| return dict(prefix=prefix, shortname=shortname, mount_point=mount_point) |
| c.app = c.project.app_instance(mount_point) |
| if not c.app: |
| flash('Mount point %s not found on project %s' % |
| (mount_point, c.project.shortname), 'error') |
| return dict(prefix=prefix, shortname=shortname, mount_point=mount_point) |
| source_url = c.app.config.options.get('init_from_url') |
| source_path = c.app.config.options.get('init_from_path') |
| if not (source_url or source_path): |
| flash('%s does not appear to be a cloned repo' % |
| c.app, 'error') |
| return dict(prefix=prefix, shortname=shortname, mount_point=mount_point) |
| allura.tasks.repo_tasks.reclone_repo.post( |
| prefix=prefix, shortname=shortname, mount_point=mount_point) |
| flash('Repository is being recloned') |
| else: |
| prefix = 'p' |
| shortname = '' |
| mount_point = '' |
| return dict(prefix=prefix, shortname=shortname, mount_point=mount_point) |
| |
| def _search(self, model, fields, add_fields, q=None, f=None, page=0, limit=None, **kw): |
| all_fields = fields + [(fld, fld) for fld in add_fields] |
| c.search_form = W.admin_search_form(all_fields) |
| c.page_list = W.page_list |
| c.page_size = W.page_size |
| count = 0 |
| objects = [] |
| limit, page, start = g.handle_paging(limit, page, default=25) |
| if q: |
| if f in ('username', 'shortname'): |
| # these are always lowercase, so search by lowercase |
| q = q.lower() |
| match = search.site_admin_search(model, q, f, rows=limit, start=start) |
| if match: |
| count = match.hits |
| objects = match.docs |
| |
| ids = [obj['id'] for obj in objects] |
| mongo_objects = search.mapped_artifacts_from_index_ids(ids, model) |
| for i in range(len(objects)): |
| obj = objects[i] |
| _id = obj['id'].split('#')[1] |
| obj['object'] = mongo_objects.get(_id) |
| # Some objects can be deleted, but still have index in solr, should skip those |
| objects = [o for o in objects if o.get('object')] |
| |
| def convert_fields(obj): |
| # throw the type away (e.g. '_s' from 'url_s') |
| result = {} |
| for k,val in obj.items(): |
| name = k.rsplit('_', 1) |
| if len(name) == 2: |
| name = name[0] |
| else: |
| name = k |
| result[name] = val |
| return result |
| |
| return { |
| 'q': q, |
| 'f': f, |
| 'objects': list(map(convert_fields, objects)), |
| 'count': count, |
| 'page': page, |
| 'limit': limit, |
| 'fields': fields, |
| 'additional_fields': add_fields, |
| 'type_s': model.type_s, |
| } |
| |
| @without_trailing_slash |
| @expose('jinja:allura:templates/site_admin_search.html') |
| @validate(validators=dict(q=v.UnicodeString(if_empty=None), |
| limit=validators.Int(if_invalid=None), |
| page=validators.Int(if_empty=0, if_invalid=0))) |
| def search_projects(self, q=None, f=None, page=0, limit=None, **kw): |
| fields = [('shortname', 'shortname'), ('name', 'full name')] |
| add_fields = aslist(tg.config.get('search.project.additional_search_fields'), ',') |
| r = self._search(M.Project, fields, add_fields, q, f, page, limit, **kw) |
| r['search_results_template'] = 'allura:templates/site_admin_search_projects_results.html' |
| r['additional_display_fields'] = \ |
| aslist(tg.config.get('search.project.additional_display_fields'), ',') |
| r['provider'] = ProjectRegistrationProvider.get() |
| return r |
| |
| @without_trailing_slash |
| @expose('jinja:allura:templates/site_admin_search.html') |
| @validate(validators=dict(q=v.UnicodeString(if_empty=None), |
| limit=validators.Int(if_invalid=None), |
| page=validators.Int(if_empty=0, if_invalid=0))) |
| def search_users(self, q=None, f=None, page=0, limit=None, **kw): |
| fields = [('username', 'username'), ('display_name', 'display name')] |
| add_fields = aslist(tg.config.get('search.user.additional_search_fields'), ',') |
| r = self._search(M.User, fields, add_fields, q, f, page, limit, **kw) |
| r['objects'] = [dict(u, status=h.get_user_status(u['object'])) for u in r['objects']] |
| r['search_results_template'] = 'allura:templates/site_admin_search_users_results.html' |
| r['additional_display_fields'] = \ |
| aslist(tg.config.get('search.user.additional_display_fields'), ',') |
| r['provider'] = AuthenticationProvider.get(request) |
| return r |
| |
| |
| class DeleteProjectsController: |
| delete_form_validators = dict( |
| projects=v.UnicodeString(if_empty=None), |
| reason=v.UnicodeString(if_empty=None), |
| disable_users=validators.StringBool(if_empty=False)) |
| |
| def remove_comments(self, lines): |
| return [l.split('#', 1)[0] for l in lines] |
| |
| def parse_projects(self, projects): |
| """Takes projects from user input and returns a list of tuples (input, project, error)""" |
| provider = ProjectRegistrationProvider.get() |
| projects = projects.splitlines() |
| projects = self.remove_comments(projects) |
| parsed_projects = [] |
| for input in projects: |
| if input.strip(): |
| p, error = provider.project_from_url(input.strip()) |
| parsed_projects.append((input, p, error)) |
| return parsed_projects |
| |
| def format_parsed_projects(self, projects): |
| template = '{} # {}' |
| lines = [] |
| for input, p, error in projects: |
| comment = f'OK: {p.url()}' if p else error |
| lines.append(template.format(input, comment)) |
| return '\n'.join(lines) |
| |
| @with_trailing_slash |
| @expose('jinja:allura:templates/site_admin_delete_projects.html') |
| @validate(validators=delete_form_validators) |
| def index(self, projects=None, reason=None, disable_users=False, **kw): |
| return {'projects': projects, |
| 'reason': reason, |
| 'disable_users': disable_users} |
| |
| @expose('jinja:allura:templates/site_admin_delete_projects_confirm.html') |
| @require_post() |
| @without_trailing_slash |
| @validate(validators=delete_form_validators) |
| def confirm(self, projects=None, reason=None, disable_users=False, **kw): |
| if not projects: |
| flash('No projects specified', 'warning') |
| redirect('.') |
| parsed_projects = self.parse_projects(projects) |
| projects = self.format_parsed_projects(parsed_projects) |
| edit_link = './?projects={}&reason={}&disable_users={}'.format( |
| h.urlquoteplus(projects), |
| h.urlquoteplus(reason or ''), |
| h.urlquoteplus(disable_users)) |
| return {'projects': projects, |
| 'parsed_projects': parsed_projects, |
| 'edit_link': edit_link, |
| 'reason': reason, |
| 'disable_users': disable_users} |
| |
| @expose() |
| @require_post() |
| @without_trailing_slash |
| @validate(validators=delete_form_validators) |
| def really_delete(self, projects=None, reason=None, disable_users=False, **kw): |
| if not projects: |
| flash('No projects specified', 'warning') |
| redirect('.') |
| projects = self.parse_projects(projects) |
| task_params = [p.url().strip('/') for (_, p, _) in projects if p] |
| if not task_params: |
| flash('Unable to parse at least one project from your input', 'warning') |
| redirect('.') |
| task_params = ' '.join(task_params) |
| if reason: |
| task_params = f'-r {pipes.quote(reason)} {task_params}' |
| if disable_users: |
| task_params = f'--disable-users {task_params}' |
| DeleteProjects.post(task_params) |
| flash('Delete scheduled', 'ok') |
| redirect('.') |
| |
| |
| class SiteNotificationController: |
| |
| def __init__(self, note=None): |
| self.note = note |
| |
| @expose() |
| def _lookup(self, id, *remainder): |
| note = M.notification.SiteNotification.query.get(_id=bson.ObjectId(id)) |
| return SiteNotificationController(note=note), remainder |
| |
| @expose('jinja:allura:templates/site_admin_site_notifications_list.html') |
| @with_trailing_slash |
| def index(self, page=0, limit=25, **kw): |
| c.page_list = W.page_list |
| c.page_size = W.page_size |
| |
| limit, page = h.paging_sanitizer(limit, page) |
| |
| query = M.notification.SiteNotification.query.find().sort('_id', -1) |
| count = query.count() |
| notifications = paginate.Page(query.all(), page+1, limit) |
| |
| return { |
| 'notifications': notifications, |
| 'count': count, |
| 'page_url': page, |
| 'limit': limit |
| } |
| |
| @expose('jinja:allura:templates/site_admin_site_notifications_create_update.html') |
| @without_trailing_slash |
| def new(self, **kw): |
| """Render the New SiteNotification form""" |
| return dict( |
| form_errors=request.validation.errors or {}, |
| form_values=request.validation.values or {}, |
| form_title='New Site Notification', |
| form_action='create' |
| ) |
| |
| @expose() |
| @require_post() |
| @validate(v.CreateSiteNotificationSchema(), error_handler=new) |
| def create(self, impressions, content, user_role, page_regex, page_tool_type, active=False): |
| """Post a new note""" |
| M.notification.SiteNotification( |
| active=active, impressions=impressions, content=content, |
| user_role=user_role, page_regex=page_regex, |
| page_tool_type=page_tool_type) |
| ThreadLocalODMSession().flush_all() |
| redirect('../site_notifications') |
| |
| @expose('jinja:allura:templates/site_admin_site_notifications_create_update.html') |
| def edit(self, **kw): |
| if request.validation.values: |
| return dict( |
| form_errors=request.validation.errors or {}, |
| form_values=request.validation.values or {}, |
| form_title='Edit Site Notification', |
| form_action='update' |
| ) |
| form_value = {} |
| form_value['active'] = str(self.note.active) |
| form_value['impressions'] = self.note.impressions |
| form_value['content'] = self.note.content |
| form_value['user_role'] = self.note.user_role if self.note.user_role is not None else '' |
| form_value['page_regex'] = self.note.page_regex if self.note.page_regex is not None else '' |
| form_value['page_tool_type'] = self.note.page_tool_type if self.note.page_tool_type is not None else '' |
| return dict(form_errors={}, |
| form_values=form_value, |
| form_title='Edit Site Notification', |
| form_action='update') |
| |
| @expose() |
| @require_post() |
| @validate(v.CreateSiteNotificationSchema(), error_handler=edit) |
| def update(self, active=True, impressions=0, content='', user_role=None, page_regex=None, page_tool_type=None): |
| self.note.active = active |
| self.note.impressions = impressions |
| self.note.content = content |
| self.note.user_role = user_role |
| self.note.page_regex = page_regex |
| self.note.page_tool_type = page_tool_type |
| ThreadLocalODMSession().flush_all() |
| redirect('..') |
| |
| @expose() |
| @require_post() |
| def delete(self): |
| self.note.delete() |
| ThreadLocalODMSession().flush_all() |
| redirect(six.ensure_text(request.referer or '/')) |
| |
| |
| class TaskManagerController: |
| |
| def _check_security(self): |
| require_site_admin(c.user) |
| |
| @expose('jinja:allura:templates/site_admin_task_list.html') |
| @without_trailing_slash |
| def index(self, page_num=1, minutes=10, state=None, task_name=None, host=None, **kw): |
| now = datetime.utcnow() |
| try: |
| page_num = int(page_num) |
| except ValueError: |
| page_num = 1 |
| try: |
| minutes = int(minutes) |
| except ValueError: |
| minutes = 1 |
| start_dt = now - timedelta(minutes=(page_num - 1) * minutes) |
| end_dt = now - timedelta(minutes=page_num * minutes) |
| start = bson.ObjectId.from_datetime(start_dt) |
| end = bson.ObjectId.from_datetime(end_dt) |
| query = {'_id': {'$gt': end}} |
| if page_num > 1: |
| query['_id']['$lt'] = start |
| if state: |
| query['state'] = state |
| if task_name: |
| query['task_name'] = {'$regex': re.escape(task_name)} |
| if host: |
| query['process'] = {'$regex': re.escape(host)} |
| |
| tasks = list(M.monq_model.MonQTask.query.find(query).sort('_id', -1)) |
| for task in tasks: |
| task.project = M.Project.query.get(_id=task.context.project_id) |
| task.user = M.User.query.get(_id=task.context.user_id) |
| newer_url = tg.url( |
| params=dict(request.params, page_num=page_num - 1)).lstrip('/') |
| older_url = tg.url( |
| params=dict(request.params, page_num=page_num + 1)).lstrip('/') |
| return dict( |
| tasks=tasks, |
| page_num=page_num, |
| minutes=minutes, |
| newer_url=newer_url, |
| older_url=older_url, |
| window_start=start_dt, |
| window_end=end_dt, |
| ) |
| |
| @expose('jinja:allura:templates/site_admin_task_view.html') |
| @without_trailing_slash |
| def view(self, task_id): |
| try: |
| task = M.monq_model.MonQTask.query.get(_id=bson.ObjectId(task_id)) |
| except bson.errors.InvalidId: |
| task = None |
| if task: |
| task.project = M.Project.query.get(_id=task.context.project_id) |
| task.app_config = M.AppConfig.query.get( |
| _id=task.context.app_config_id) |
| task.user = M.User.query.get(_id=task.context.user_id) |
| return dict(task=task) |
| |
| @expose('jinja:allura:templates/site_admin_task_new.html') |
| @without_trailing_slash |
| def new(self, **kw): |
| """Render the New Task form""" |
| return dict( |
| form_errors=request.validation.errors or {}, |
| form_values=request.validation.values or {}, |
| ) |
| |
| @expose() |
| @require_post() |
| @validate(v.CreateTaskSchema(), error_handler=new) |
| def create(self, task, task_args=None, user=None, path=None): |
| """Post a new task""" |
| args = task_args.get("args", ()) |
| kw = task_args.get("kwargs", {}) |
| config_dict = path |
| if user: |
| config_dict['user'] = user |
| with h.push_config(c, **config_dict): |
| task = task.post(*args, **kw) |
| redirect('view/%s' % task._id) |
| |
| @expose() |
| @require_post() |
| def resubmit(self, task_id): |
| try: |
| task = M.monq_model.MonQTask.query.get(_id=bson.ObjectId(task_id)) |
| except bson.errors.InvalidId: |
| task = None |
| if task is None: |
| raise HTTPNotFound() |
| task.state = 'ready' |
| redirect('../view/%s' % task._id) |
| |
| @expose('json:') |
| def task_doc(self, task_name, **kw): |
| """Return a task's docstring""" |
| error, doc = None, None |
| try: |
| task = v.TaskValidator.to_python(task_name) |
| doc = task.__doc__ or 'No doc string available' |
| doc = re.sub(r'^usage: ([^-][a-z_-]+ )?', # remove usage: and possible incorrect binary like "mod_wsgi" |
| 'Enter CLI formatted args above, like "args": ["--foo bar baz"]\n\n', |
| doc) |
| except Invalid as e: |
| error = str(e) |
| return dict(doc=doc, error=error) |
| |
| |
| class StatsController: |
| """Show neighborhood stats.""" |
| @expose('jinja:allura:templates/site_admin_stats.html') |
| @with_trailing_slash |
| def index(self, **kw): |
| neighborhoods = [] |
| for n in M.Neighborhood.query.find(): |
| project_count = M.Project.query.find( |
| dict(neighborhood_id=n._id)).count() |
| configured_count = M.Project.query.find( |
| dict(neighborhood_id=n._id, database_configured=True)).count() |
| neighborhoods.append((n.name, project_count, configured_count)) |
| neighborhoods.sort(key=lambda n: n[0]) |
| return dict(neighborhoods=neighborhoods) |
| |
| |
| class AdminUserDetailsController: |
| |
| @expose('jinja:allura:templates/site_admin_user_details.html') |
| @without_trailing_slash |
| def _default(self, username, limit=25, page=0): |
| user = M.User.by_username(username) |
| if not user or user.is_anonymous(): |
| raise HTTPNotFound() |
| projects = user.my_projects().all() |
| audit_log = self._audit_log(user, limit, page) |
| info = { |
| 'user': user, |
| 'status': h.get_user_status(user), |
| 'projects': projects, |
| 'audit_log': audit_log, |
| } |
| p = AuthenticationProvider.get(request) |
| info.update(p.user_details(user)) |
| return info |
| |
| def _audit_log(self, user, limit, page): |
| limit = int(limit) |
| page = int(page) |
| if user is None or user.is_anonymous(): |
| return dict( |
| entries=[], |
| imit=limit, |
| page=page, |
| count=0) |
| q = M.AuditLog.for_user(user) |
| count = q.count() |
| q = q.sort('timestamp', -1) |
| q = q.skip(page * limit) |
| if count > limit: |
| q = q.limit(limit) |
| else: |
| limit = count |
| c.audit_log_widget = W.audit |
| return dict( |
| entries=q.all(), |
| limit=limit, |
| page=page, |
| count=count) |
| |
| @expose() |
| @require_post() |
| def add_audit_trail_entry(self, **kw): |
| username = kw.get('username') |
| comment = kw.get('comment') |
| user = M.User.by_username(username) |
| if user and not user.is_anonymous() and comment: |
| M.AuditLog.comment_user(c.user, comment, user=user) |
| flash('Comment added', 'ok') |
| else: |
| flash(f'Can not add comment "{comment}" for user {user}') |
| redirect(six.ensure_text(request.referer or '/')) |
| |
| @expose() |
| @require_post() |
| def set_status(self, username=None, status=None): |
| user = M.User.by_username(username) |
| if not user or user.is_anonymous(): |
| raise HTTPNotFound() |
| if status == 'enable' and (user.disabled or user.pending): |
| if user.pending: |
| AuthenticationProvider.get(request).activate_user(user, |
| audit=not user.disabled) # avoid dupe audits |
| if user.disabled: |
| AuthenticationProvider.get(request).enable_user(user) |
| flash('User enabled') |
| elif status == 'disable' and not user.disabled: |
| AuthenticationProvider.get(request).disable_user(user) |
| flash('User disabled') |
| elif status == 'pending': |
| if user.disabled: |
| AuthenticationProvider.get(request).enable_user(user, |
| audit=user.pending) # skip dupe audits |
| if not user.pending: |
| AuthenticationProvider.get(request).deactivate_user(user) |
| flash('Set user status to pending') |
| redirect(six.ensure_text(request.referer or '/')) |
| |
| @expose() |
| @require_post() |
| def set_random_password(self, username=None): |
| user = M.User.by_username(username) |
| if not user or user.is_anonymous(): |
| raise HTTPNotFound() |
| pwd = h.random_password() |
| AuthenticationProvider.get(request).set_password(user, None, pwd) |
| h.auditlog_user('Set random password', user=user) |
| flash('Password is set', 'ok') |
| redirect(six.ensure_text(request.referer or '/')) |
| |
| @expose() |
| @require_post() |
| def send_password_reset_link(self, username=None): |
| user = M.User.by_username(username) |
| if not user or user.is_anonymous(): |
| raise HTTPNotFound() |
| email = user.get_pref('email_address') |
| try: |
| allura.controllers.auth.AuthController().password_recovery_hash(email) |
| except HTTPFound: |
| pass # catch redirect to '/' |
| redirect(six.ensure_text(request.referer or '/')) |
| |
| @expose() |
| @require_post() |
| def make_password_reset_url(self, username): |
| user = M.User.by_username(username) |
| if not user or user.is_anonymous(): |
| raise HTTPNotFound() |
| h.auditlog_user('Generated new password reset URL and shown to admin user', user=user) |
| return user.make_password_reset_url() |
| |
| @h.vardec |
| @expose() |
| @require_post() |
| def update_emails(self, username, **kw): |
| user = M.User.by_username(username) |
| if not user or user.is_anonymous(): |
| raise HTTPNotFound() |
| allura.controllers.auth.PreferencesController()._update_emails(user, admin=True, form_params=kw) |
| redirect(six.ensure_text(request.referer or '/')) |
| |
| |
| class StatsSiteAdminExtension(SiteAdminExtension): |
| controllers = {'stats': StatsController} |
| |
| def update_sidebar_menu(self, links): |
| links.append(SitemapEntry('Stats', '/nf/admin/stats', |
| ui_icon=g.icons['stats'])) |