| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import json |
| import logging |
| import os |
| from pprint import pformat |
| |
| import six |
| from six.moves.urllib.parse import unquote, urlencode |
| |
| # Non-stdlib imports |
| from tg import expose, validate, redirect, flash, jsonify |
| from tg.decorators import with_trailing_slash, without_trailing_slash |
| from tg import tmpl_context as c, app_globals as g |
| from tg import request |
| from formencode import validators |
| from webob import exc |
| from ming.orm import session |
| |
| # Pyforge-specific imports |
| from allura import model as M |
| from allura.lib import helpers as h |
| from allura.lib import validators as v |
| from allura.app import Application, SitemapEntry, DefaultAdminController, ConfigOption |
| from allura.lib.search import search_app |
| from allura.lib.decorators import require_post, memorable_forget |
| from allura.lib.security import require_access, has_access |
| from allura.lib.utils import is_ajax, JSONForExport, permanent_redirect |
| from allura.tasks import notification_tasks |
| from allura.lib import exceptions as forge_exc |
| from allura.controllers import AppDiscussionController, BaseController, AppDiscussionRestController |
| from allura.controllers import DispatchIndex |
| from allura.controllers import attachments as ac |
| from allura.controllers.feed import FeedArgs, FeedController |
| from allura.controllers.rest import AppRestControllerMixin |
| from allura.lib import widgets as w |
| from allura.lib.widgets import form_fields as ffw |
| from allura.lib.widgets.subscriptions import SubscribeForm |
| from allura.lib.widgets.search import SearchResults, SearchHelp |
| |
| # Local imports |
| from forgewiki import model as WM |
| from forgewiki import version |
| |
| log = logging.getLogger(__name__) |
| |
| |
| class W: |
| thread = w.Thread( |
| page=None, limit=None, page_size=None, count=None, |
| style='linear') |
| markdown_editor = ffw.MarkdownEdit() |
| confirmation = ffw.Lightbox(name='confirm', |
| trigger='a.post-link', |
| options="{ modalCSS: { minHeight: 0, width: 'inherit', top: '150px'}}") |
| label_edit = ffw.LabelEdit() |
| attachment_add = ffw.AttachmentAdd() |
| attachment_list = ffw.AttachmentList() |
| subscribe_form = SubscribeForm() |
| page_subscribe_form = SubscribeForm(thing='page') |
| page_list = ffw.PageList() |
| page_size = ffw.PageSize() |
| search_results = SearchResults() |
| help_modal = SearchHelp(fields={'title': 'title', |
| 'text': '"Page text"', |
| 'mod_date_dt': 'Last modified. Example: mod_date_dt:[2018-01-01T00:00:00Z TO *]', |
| 'labels_t': 'labels', |
| 'author_user_name_t': 'username (for comments only)', |
| }) |
| icons = { |
| 24: 'images/wiki_24.png', |
| 32: 'images/wiki_32.png', |
| 48: 'images/wiki_48.png' |
| } |
| |
| |
| class ForgeWikiApp(Application): |
| |
| '''This is the Wiki app for PyForge''' |
| __version__ = version.__version__ |
| permissions = ['configure', 'read', 'create', 'edit', 'delete', |
| 'unmoderated_post', 'post', 'moderate', 'admin'] |
| permissions_desc = { |
| 'read': 'View wiki pages.', |
| 'create': 'Create wiki pages.', |
| 'edit': 'Edit wiki pages.', |
| 'delete': 'Delete wiki pages.', |
| 'admin': 'Set permissions. Configure options. Set wiki home page.', |
| } |
| config_options = Application.config_options + [ |
| ConfigOption('AllowEmailPosting', bool, True) |
| ] |
| searchable = True |
| exportable = True |
| tool_label = 'Wiki' |
| tool_description = """ |
| Documentation is key to your project and the wiki tool |
| helps make it easy for anyone to contribute. |
| """ |
| default_mount_label = 'Wiki' |
| default_mount_point = 'wiki' |
| ordinal = 5 |
| default_root_page_name = 'Home' |
| icons = { |
| 24: 'images/wiki_24.png', |
| 32: 'images/wiki_32.png', |
| 48: 'images/wiki_48.png' |
| } |
| |
| def __init__(self, project, config): |
| Application.__init__(self, project, config) |
| self.root = RootController() |
| self.api_root = RootRestController() |
| self.admin = WikiAdminController(self) |
| |
| def has_access(self, user, topic): |
| return has_access(c.app, 'post')(user=user) |
| |
| def handle_message(self, topic, message): |
| log.info('Message from %s (%s)', |
| topic, self.config.options.mount_point) |
| log.info('Headers are: %s', message['headers']) |
| page = WM.Page.find_page(topic) |
| if page is None: |
| page = WM.Page.find_page(topic.replace('_', ' ')) |
| if page is not None: |
| self.handle_artifact_message(page, message) |
| else: |
| log.exception('Error getting artifact %s', topic) |
| |
| @property |
| def root_page_name(self): |
| globals = WM.Globals.query.get(app_config_id=self.config._id) |
| if globals is not None: |
| page_name = globals.root |
| else: |
| page_name = self.default_root_page_name |
| return page_name |
| |
| @root_page_name.setter |
| def root_page_name(self, new_root_page_name): |
| globals = WM.Globals.query.get(app_config_id=self.config._id) |
| if globals is not None: |
| globals.root = new_root_page_name |
| elif new_root_page_name != self.default_root_page_name: |
| globals = WM.Globals( |
| app_config_id=self.config._id, root=new_root_page_name) |
| if globals is not None: |
| session(globals).flush(globals) |
| |
| def default_root_page_text(self): |
| return """Welcome to your wiki! |
| |
| This is the default page, edit it as you see fit. To add a new page simply reference it within brackets, e.g.: [SamplePage]. |
| |
| The wiki uses [Markdown](%s) syntax. |
| |
| [[members limit=20]] |
| """ % (self.url + 'markdown_syntax/') |
| |
| @property |
| def show_discussion(self): |
| return self.config.options.get('show_discussion', True) |
| |
| @show_discussion.setter |
| def show_discussion(self, show): |
| self.config.options['show_discussion'] = bool(show) |
| |
| @property |
| def show_left_bar(self): |
| return self.config.options.get('show_left_bar', True) |
| |
| @show_left_bar.setter |
| def show_left_bar(self, show): |
| self.config.options['show_left_bar'] = bool(show) |
| |
| @property |
| def show_right_bar(self): |
| return self.config.options.get('show_right_bar', True) |
| |
| @show_right_bar.setter |
| def show_right_bar(self, show): |
| self.config.options['show_right_bar'] = bool(show) |
| |
| @property |
| def allow_email_posting(self): |
| return self.config.options.get('AllowEmailPosting', True) |
| |
| @allow_email_posting.setter |
| def allow_email_posting(self, show): |
| self.config.options['AllowEmailPosting'] = bool(show) |
| |
| def main_menu(self): |
| """Apps should provide their entries to be added to the main nav |
| :return: a list of :class:`SitemapEntries <allura.app.SitemapEntry>` |
| """ |
| return [SitemapEntry( |
| self.config.options.mount_label, |
| '.')] |
| |
| def sitemap_xml(self): |
| """ |
| Used for generating sitemap.xml. |
| If the root page has default content, omit it from the sitemap.xml. |
| Assumes :attr:`main_menu` will return an entry pointing to the root page. |
| :return: a list of :class:`SitemapEntries <allura.app.SitemapEntry>` |
| """ |
| root_page = WM.Page.query.get(app_config_id=self.config._id, title=self.root_page_name) |
| if self.should_noindex_page(root_page): |
| return [] |
| return self.main_menu() |
| |
| @property |
| @h.exceptionless([], log) |
| def sitemap(self): |
| menu_id = self.config.options.mount_label |
| with h.push_config(c, app=self): |
| pages = [ |
| SitemapEntry(p.title, p.url()) |
| for p in WM.Page.query.find(dict( |
| app_config_id=self.config._id, |
| deleted=False))] |
| return [ |
| SitemapEntry(menu_id, '.')[SitemapEntry('Pages')[pages]]] |
| |
| def should_noindex_page(self, page): |
| """Checks whether a page should not be indexed.""" |
| # If page has default name (i.e. 'Home') and has not been edited, noindex. |
| res = page and page['title'] == self.default_root_page_name and page['version'] == 1 |
| if not res: |
| if page and page['text'] in ('You can edit this description', ): |
| res = True |
| return res |
| |
| def create_common_wiki_menu(self, has_create_access, admin_menu=False): |
| links = [] |
| if has_create_access: |
| links += [SitemapEntry('Create Page', self.url + 'create_wiki_page/', |
| ui_icon=g.icons['add'], |
| className='admin_modal')] |
| if not admin_menu: |
| links += [SitemapEntry(''), |
| SitemapEntry('Wiki Home', self.url, className='wiki_home')] |
| links += [SitemapEntry('Browse Pages', self.url + 'browse_pages/'), |
| SitemapEntry('Browse Labels', self.url + 'browse_tags/')] |
| discussion = self.config.discussion |
| pending_mod_count = M.Post.query.find({ |
| 'discussion_id': discussion._id, |
| 'status': 'pending', |
| 'deleted': False |
| }).count() if discussion else 0 |
| if pending_mod_count and h.has_access(discussion, 'moderate')(): |
| links.append( |
| SitemapEntry( |
| 'Moderate', discussion.url() + 'moderate', ui_icon=g.icons['moderate'], |
| small=pending_mod_count)) |
| if not c.user.is_anonymous() and not admin_menu: |
| subscribed = M.Mailbox.subscribed(app_config_id=self.config._id) |
| subscribe_action = 'unsubscribe' if subscribed else 'subscribe' |
| subscribe_title = '{}{}'.format( |
| subscribe_action.capitalize(), |
| '' if subscribed else ' to wiki') |
| subscribe_url = '{}#toggle-{}'.format(self.url + 'subscribe', subscribe_action) |
| links.append(SitemapEntry(None)) |
| links.append(SitemapEntry(subscribe_title, subscribe_url, ui_icon=g.icons['mail'])) |
| if not admin_menu: |
| links += [SitemapEntry(''), |
| SitemapEntry('Formatting Help', '/nf/markdown_syntax', |
| extra_html_attrs={'target': '_blank', 'rel': 'nofollow'})] |
| return links |
| |
| def admin_menu(self, skip_common_menu=False): |
| links = [SitemapEntry('Set Home', |
| self.admin_url + 'home', |
| className='admin_modal')] |
| |
| if not self.show_left_bar and not skip_common_menu: |
| links += self.create_common_wiki_menu(has_create_access=True, admin_menu=True) |
| links += super().admin_menu(force_options=True) |
| return links |
| |
| @h.exceptionless([], log) |
| def sidebar_menu(self): |
| return self.create_common_wiki_menu(has_create_access=has_access(self, 'create')) |
| |
| def sidebar_menu_js(self): |
| return ''' |
| $('#sidebar').on('click', 'a[href$="#toggle-subscribe"]', function(e) { |
| e.preventDefault(); |
| var link = this; |
| var data = { |
| _session_id: $.cookie('_session_id'), |
| subscribe: '1' |
| }; |
| $.post(this.href, data, function(){ |
| $('#messages').notify('Subscribed to wiki.'); |
| $('span', link).text('Unsubscribe'); |
| $(link).attr('href', $(link).attr('href').replace('-subscribe','-unsubscribe')); |
| }); |
| }); |
| $('#sidebar').on('click', 'a[href$="#toggle-unsubscribe"]', function(e) { |
| e.preventDefault(); |
| var link = this; |
| var data = { |
| _session_id: $.cookie('_session_id'), |
| unsubscribe: '1' |
| }; |
| $.post(this.href, data, function(){ |
| $('#messages').notify('Unsubscribed.'); |
| $('span', link).text('Subscribe to wiki'); |
| $(link).attr('href', $(link).attr('href').replace('-unsubscribe','-subscribe')); |
| }); |
| }); |
| ''' |
| |
| def install(self, project): |
| 'Set up any default permissions and roles here' |
| self.config.options['project_name'] = project.name |
| super().install(project) |
| # Setup permissions |
| role_admin = M.ProjectRole.by_name('Admin')._id |
| role_developer = M.ProjectRole.by_name('Developer')._id |
| role_member = M.ProjectRole.by_name('Member')._id |
| role_auth = M.ProjectRole.by_name('*authenticated')._id |
| role_anon = M.ProjectRole.by_name('*anonymous')._id |
| self.config.acl = [ |
| M.ACE.allow(role_anon, 'read'), |
| M.ACE.allow(role_auth, 'post'), |
| M.ACE.allow(role_auth, 'unmoderated_post'), |
| M.ACE.allow(role_member, 'create'), |
| M.ACE.allow(role_member, 'edit'), |
| M.ACE.allow(role_developer, 'delete'), |
| M.ACE.allow(role_developer, 'moderate'), |
| M.ACE.allow(role_admin, 'configure'), |
| M.ACE.allow(role_admin, 'admin'), |
| ] |
| root_page_name = self.default_root_page_name |
| WM.Globals(app_config_id=c.app.config._id, root=root_page_name) |
| self.upsert_root(root_page_name, notify=False) |
| |
| def upsert_root(self, new_root, notify=True): |
| p = WM.Page.query.get(app_config_id=self.config._id, |
| title=new_root, deleted=False) |
| if p is None: |
| with h.push_config(c, app=self), h.notifications_disabled(c.project, disabled=not notify): |
| p = WM.Page.upsert(new_root) |
| p.text = self.default_root_page_text() |
| p.commit() |
| |
| def uninstall(self, project): |
| "Remove all the tool's artifacts from the database" |
| WM.WikiAttachment.query.remove(dict(app_config_id=self.config._id)) |
| WM.Page.query.remove(dict(app_config_id=self.config._id)) |
| WM.Globals.query.remove(dict(app_config_id=self.config._id)) |
| super().uninstall(project) |
| |
| def bulk_export(self, f, export_path='', with_attachments=False): |
| f.write('{"pages": [') |
| pages = list(WM.Page.query.find(dict( |
| app_config_id=self.config._id, |
| deleted=False))) |
| if with_attachments: |
| GenericClass = JSONForExport |
| self.export_attachments(pages, export_path) |
| else: |
| GenericClass = jsonify.JSONEncoder |
| for i, page in enumerate(pages): |
| if i > 0: |
| f.write(',') |
| json.dump(page, f, cls=GenericClass, indent=2) |
| f.write(']}') |
| |
| def export_attachments(self, pages, export_path): |
| for page in pages: |
| attachment_path = self.get_attachment_export_path(export_path, str(page._id)) |
| self.save_attachments(attachment_path, page.attachments) |
| |
| for post in page.discussion_thread.query_posts(status='ok'): |
| post_path = os.path.join( |
| attachment_path, |
| page.discussion_thread._id, |
| post.slug |
| ) |
| self.save_attachments(post_path, post.attachments) |
| |
| |
| class RootController(BaseController, DispatchIndex, FeedController): |
| |
| def __init__(self): |
| self._discuss = AppDiscussionController() |
| |
| def catch_all(self, *args, **kw): |
| url = '/{}/?{}'.format(request.controller_url, urlencode(kw)) |
| redirect(h.urlquote(h.really_unicode(url))) |
| |
| def _check_security(self): |
| require_access(c.app, 'read') |
| |
| @with_trailing_slash |
| @expose() |
| def index(self, **kw): |
| permanent_redirect(h.urlquote(h.really_unicode(c.app.root_page_name) + '/')) |
| |
| @expose() |
| def _lookup(self, pname, *remainder): |
| """Instantiate a Page object, and continue dispatch there.""" |
| return PageController(pname), remainder |
| |
| @expose() |
| def new_page(self, title): |
| redirect(h.urlquote(h.really_unicode(title) + '/')) |
| |
| @with_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/search.html') |
| @validate(dict(q=v.UnicodeString(if_empty=None), |
| history=validators.StringBool(if_empty=False), |
| search_comments=validators.StringBool(if_empty=False), |
| project=validators.StringBool(if_empty=False))) |
| def search(self, q=None, history=None, search_comments=None, project=None, limit=None, page=0, **kw): |
| 'local wiki search' |
| c.search_results = W.search_results |
| c.help_modal = W.help_modal |
| search_params = kw |
| search_params.update({ |
| 'q': q or '', |
| 'history': history, |
| 'search_comments': search_comments, |
| 'project': project, |
| 'limit': limit, |
| 'page': page, |
| 'allowed_types': ['WikiPage', 'WikiPage Snapshot'], |
| }) |
| return search_app(**search_params) |
| |
| @with_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/browse.html') |
| @validate(dict(sort=v.UnicodeString(if_empty='alpha'), |
| show_deleted=validators.StringBool(if_empty=False), |
| page=validators.Int(if_empty=0, if_invalid=0), |
| limit=validators.Int(if_empty=None, if_invalid=None)), |
| error_handler=catch_all) |
| def browse_pages(self, sort='alpha', show_deleted=False, page=0, limit=None, **kw): |
| 'list of all pages in the wiki' |
| c.page_list = W.page_list |
| c.page_size = W.page_size |
| limit, pagenum, start = g.handle_paging(limit, page, default=25) |
| count = 0 |
| pages = [] |
| uv_pages = [] |
| criteria = dict(app_config_id=c.app.config._id) |
| can_delete = has_access(c.app, 'delete')() |
| show_deleted = show_deleted and can_delete |
| if not can_delete: |
| criteria['deleted'] = False |
| q = WM.Page.query.find(criteria) |
| if sort == 'alpha': |
| q = q.sort('title') |
| count = q.count() |
| q = q.skip(start).limit(int(limit)) |
| for page in q: |
| recent_edit = page.history().first() |
| p = dict(title=page.title, url=page.url(), deleted=page.deleted) |
| if recent_edit: |
| p['updated'] = recent_edit.timestamp |
| p['user_label'] = recent_edit.author.display_name |
| p['user_name'] = recent_edit.author.username |
| pages.append(p) |
| else: |
| if sort == 'recent': |
| uv_pages.append(p) |
| else: |
| pages.append(p) |
| if sort == 'recent': |
| pages.sort(reverse=True, key=lambda x: (x['updated'])) |
| pages = pages + uv_pages |
| return dict( |
| pages=pages, can_delete=can_delete, show_deleted=show_deleted, |
| limit=limit, count=count, page=pagenum) |
| |
| @with_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/browse_tags.html') |
| @validate(dict(sort=v.UnicodeString(if_empty='alpha'), |
| page=validators.Int(if_empty=0, if_invalid=0), |
| limit=validators.Int(if_empty=None, if_invalid=None))) |
| def browse_tags(self, sort='alpha', page=0, limit=None, **kw): |
| 'list of all labels in the wiki' |
| c.page_list = W.page_list |
| c.page_size = W.page_size |
| limit, pagenum, start = g.handle_paging(limit, page, default=25) |
| count = 0 |
| page_tags = {} |
| q = WM.Page.query.find(dict(app_config_id=c.app.config._id, |
| deleted=False, |
| labels={'$ne': []})) |
| for page in q: |
| if page.labels: |
| for label in page.labels: |
| if label not in page_tags: |
| page_tags[label] = [] |
| page_tags[label].append(page) |
| count = len(page_tags) |
| name_labels = sorted(page_tags) |
| return dict(labels=page_tags, |
| limit=limit, |
| count=count, |
| page=pagenum, |
| name_labels=name_labels[start:start + limit]) |
| |
| @with_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/create_page.html') |
| def create_wiki_page(self, **kw): |
| return {} |
| |
| @expose() |
| def markdown_syntax(self, **kw): |
| permanent_redirect('/nf/markdown_syntax') |
| |
| @expose() |
| @require_post() |
| @validate(W.subscribe_form) |
| def subscribe(self, subscribe=None, unsubscribe=None): |
| if subscribe: |
| M.Mailbox.subscribe(type='direct') |
| elif unsubscribe: |
| M.Mailbox.unsubscribe() |
| redirect(six.ensure_text(request.referer or '/')) |
| |
| |
| class PageController(BaseController, FeedController): |
| |
| def __init__(self, title): |
| self.title = h.really_unicode(unquote(title)) |
| self.page = WM.Page.query.get( |
| app_config_id=c.app.config._id, title=self.title) |
| if self.page is not None: |
| self.attachment = WikiAttachmentsController(self.page) |
| |
| def _check_security(self): |
| if self.page: |
| require_access(self.page, 'read') |
| if self.page.deleted: |
| require_access(self.page, 'delete') |
| elif has_access(c.app, 'create'): |
| self.rate_limit(WM.Page, 'Page create/edit') |
| else: |
| raise exc.HTTPNotFound |
| |
| def fake_page(self): |
| return dict( |
| title=self.title, |
| text='', |
| labels=[], |
| viewable_by=['all'], |
| attachments=[]) |
| |
| def get_version(self, version): |
| if not version: |
| return self.page |
| try: |
| return self.page.get_version(version) |
| except (ValueError, IndexError): |
| return None |
| |
| @expose() |
| def _lookup(self, pname, *remainder): |
| page = WM.Page.query.get( |
| app_config_id=c.app.config._id, title=pname) |
| if page: |
| redirect(page.url()) |
| else: |
| raise exc.HTTPNotFound |
| |
| @with_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/page_view.html') |
| @validate(dict(version=validators.Int(if_empty=None, if_invalid=None), |
| page=validators.Int(if_empty=0, if_invalid=0), |
| limit=validators.Int(if_empty=None, if_invalid=None))) |
| def index(self, version=None, page=0, limit=None, **kw): |
| if not self.page: |
| redirect(c.app.url + h.urlquote(self.title) + '/edit') |
| c.confirmation = W.confirmation |
| c.thread = W.thread |
| c.attachment_list = W.attachment_list |
| c.subscribe_form = W.page_subscribe_form |
| post_count = self.page.discussion_thread.post_count |
| limit, pagenum, _ = g.handle_paging(limit, page) |
| limit, pagenum = h.paging_sanitizer(limit, pagenum, post_count) |
| page = self.get_version(version) |
| if page is None: |
| if version: |
| redirect('.?version=%d' % (version - 1)) |
| else: |
| redirect('.') |
| cur = page.version |
| if cur > 1: |
| prev = cur - 1 |
| else: |
| prev = None |
| next = cur + 1 |
| hide_left_bar = not (c.app.show_left_bar) |
| subscribed_to_page = M.Mailbox.subscribed(artifact=self.page) |
| latest_version = self.page.history().limit(1).first() |
| is_latest_version = cur == latest_version.version |
| return dict( |
| page=page, |
| cur=cur, prev=prev, next=next, |
| page_subscribed=subscribed_to_page, |
| hide_left_bar=hide_left_bar, show_meta=c.app.show_right_bar, |
| pagenum=pagenum, limit=limit, count=post_count, |
| noindex=c.app.should_noindex_page(self.page), |
| is_latest_version=is_latest_version,) |
| |
| @without_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/page_edit.html') |
| def edit(self): |
| page_exists = self.page |
| if self.page: |
| require_access(self.page, 'edit') |
| page = self.page |
| else: |
| page = self.fake_page() |
| self.rate_limit(WM.Page, 'Page create/edit') # check before trying to save |
| c.confirmation = W.confirmation |
| c.markdown_editor = W.markdown_editor |
| c.attachment_add = W.attachment_add |
| c.attachment_list = W.attachment_list |
| c.label_edit = W.label_edit |
| hide_left_bar = not c.app.show_left_bar |
| return dict(page=page, |
| page_exists=page_exists, |
| hide_left_bar=hide_left_bar, |
| subscribed_to_tool=M.Mailbox.subscribed(), |
| ) |
| |
| @without_trailing_slash |
| @expose('json:') |
| @require_post() |
| def delete(self, **kw): |
| require_access(self.page, 'delete') |
| M.AuditLog.log('{}: deleted wiki page "{}"'.format( |
| c.app.config.options['mount_point'], |
| self.page.title, |
| )) |
| self.page.delete() |
| return dict(location='../' + self.page.title + '/?deleted=True') |
| |
| @without_trailing_slash |
| @expose('json:') |
| @require_post() |
| def undelete(self, **kw): |
| require_access(self.page, 'delete') |
| self.page.deleted = False |
| M.AuditLog.log('{}: undeleted wiki page "{}"'.format( |
| c.app.config.options['mount_point'], |
| self.page.title, |
| )) |
| M.Shortlink.from_artifact(self.page) |
| return dict(location='./edit') |
| |
| @without_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/page_history.html') |
| @validate(dict(page=validators.Int(if_empty=0, if_invalid=0), |
| limit=validators.Int(if_empty=None, if_invalid=None))) |
| def history(self, page=0, limit=None, **kw): |
| if not self.page: |
| raise exc.HTTPNotFound |
| c.page_list = W.page_list |
| c.page_size = W.page_size |
| c.confirmation = W.confirmation |
| limit, pagenum, start = g.handle_paging(limit, page, default=25) |
| count = 0 |
| pages = self.page.history() |
| count = pages.count() |
| pages = pages.skip(start).limit(int(limit)) |
| return dict(title=self.title, pages=pages, |
| limit=limit, count=count, page=pagenum) |
| |
| @without_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/page_diff.html') |
| @validate(dict( |
| v1=validators.Int(), |
| v2=validators.Int())) |
| def diff(self, v1, v2, **kw): |
| if not self.page: |
| raise exc.HTTPNotFound |
| p1 = self.get_version(v1) |
| p2 = self.get_version(v2) |
| result = h.diff_text(p1.text, p2.text) |
| return dict(p1=p1, p2=p2, edits=result) |
| |
| @without_trailing_slash |
| @expose(content_type='text/plain') |
| def raw(self): |
| if not self.page: |
| raise exc.HTTPNotFound |
| return pformat(self.page) |
| |
| def get_feed(self, project, app, user): |
| """Return a :class:`allura.controllers.feed.FeedArgs` object describing |
| the xml feed for this controller. |
| |
| Overrides :meth:`allura.controllers.feed.FeedController.get_feed`. |
| |
| """ |
| if not self.page: |
| return None |
| return FeedArgs( |
| {'ref_id': self.page.index_id()}, |
| 'Recent changes to %s' % self.page.title, |
| self.page.url()) |
| |
| @without_trailing_slash |
| @expose('json:') |
| @require_post() |
| @validate(dict(version=validators.Int(if_empty=1, if_invalid=1))) |
| def revert(self, version, **kw): |
| if not self.page: |
| raise exc.HTTPNotFound |
| require_access(self.page, 'edit') |
| orig = self.get_version(version) |
| if orig: |
| self.page.text = orig.text |
| self.page.commit() |
| return dict(location='.') |
| |
| @memorable_forget() |
| @without_trailing_slash |
| @h.vardec |
| @expose() |
| @require_post() |
| def update(self, title=None, text=None, |
| labels=None, |
| subscribe=False, |
| **kw): |
| activity_verb = 'created' |
| if not title: |
| flash('You must provide a title for the page.', 'error') |
| redirect('edit') |
| title = title.replace('/', '-') |
| self.rate_limit(WM.Page, 'Page create/edit') |
| if not self.page: |
| # the page doesn't exist yet, so create it |
| self.page = WM.Page.upsert(self.title) |
| M.AuditLog.log('{}: created new wiki page "{}"'.format( |
| c.app.config.options['mount_point'], |
| self.page.title, |
| )) |
| else: |
| require_access(self.page, 'edit') |
| activity_verb = 'modified' |
| name_conflict = None |
| if self.page.title != title: |
| name_conflict = WM.Page.query.find( |
| dict(app_config_id=c.app.config._id, title=title, deleted=False)).first() |
| if name_conflict: |
| flash('There is already a page named "%s".' % title, 'error') |
| else: |
| if self.page.title == c.app.root_page_name: |
| WM.Globals.query.get( |
| app_config_id=c.app.config._id).root = title |
| M.AuditLog.log('{}: renamed wiki page "{}" => "{}"'.format( |
| c.app.config.options['mount_point'], |
| self.page.title, |
| title, |
| )) |
| self.page.title = title |
| activity_verb = 'renamed' |
| old_text = self.page.text |
| self.page.text = text |
| if labels: |
| self.page.labels = labels.split(',') |
| else: |
| self.page.labels = [] |
| self.page.commit(subscribe=subscribe) |
| g.spam_checker.check(title + '\n' + text, artifact=self.page, |
| user=c.user, content_type='wiki') |
| if activity_verb == 'created': |
| notification_tasks.send_usermentions_notification.post(self.page.index_id(), text) |
| elif activity_verb == 'modified': |
| notification_tasks.send_usermentions_notification.post(self.page.index_id(), text, old_text) |
| g.director.create_activity(c.user, activity_verb, self.page, |
| related_nodes=[c.project], tags=['wiki']) |
| redirect('../' + h.urlquote(h.really_unicode(self.page.title)) + ('/' if not name_conflict else '/edit')) |
| |
| @without_trailing_slash |
| @expose('json:') |
| @require_post() |
| def update_markdown(self, text=None, **kw): |
| if has_access(self.page, 'edit'): |
| self.page.text = text |
| self.page.commit() |
| g.spam_checker.check(text, artifact=self.page, user=c.user, content_type='wiki') |
| g.director.create_activity(c.user, 'modified', self.page, related_nodes=[c.project], tags=['wiki']) |
| return { |
| 'status': 'success' |
| } |
| else: |
| return { |
| 'status': 'no_permission' |
| } |
| |
| @without_trailing_slash |
| @expose() |
| def get_markdown(self): |
| return self.page.text |
| |
| @without_trailing_slash |
| @expose() |
| @require_post() |
| def attach(self, file_info=None, **kw): |
| if not self.page: |
| raise exc.HTTPNotFound |
| require_access(self.page, 'edit') |
| self.page.add_multiple_attachments(file_info) |
| if is_ajax(request): |
| return |
| redirect(six.ensure_text(request.referer or '/')) |
| |
| @expose('json:') |
| @require_post() |
| @validate(W.subscribe_form) |
| def subscribe(self, subscribe=None, unsubscribe=None, **kw): |
| if not self.page: |
| raise exc.HTTPNotFound |
| if subscribe: |
| self.page.subscribe(type='direct') |
| elif unsubscribe: |
| self.page.unsubscribe() |
| return { |
| 'status': 'ok', |
| 'subscribed': M.Mailbox.subscribed(artifact=self.page), |
| 'subscribed_to_tool': M.Mailbox.subscribed(), |
| 'subscribed_to_entire_name': 'wiki', |
| } |
| |
| |
| class WikiAttachmentController(ac.AttachmentController): |
| AttachmentClass = WM.WikiAttachment |
| edit_perm = 'edit' |
| |
| |
| class WikiAttachmentsController(ac.AttachmentsController): |
| AttachmentControllerClass = WikiAttachmentController |
| |
| |
| class RootRestController(BaseController, AppRestControllerMixin): |
| |
| def __init__(self): |
| self._discuss = AppDiscussionRestController() |
| |
| def _check_security(self): |
| require_access(c.app, 'read') |
| |
| @expose('json:') |
| def index(self, **kw): |
| page_titles = [] |
| pages = WM.Page.query.find( |
| dict(app_config_id=c.app.config._id, deleted=False)) |
| for page in pages: |
| if has_access(page, 'read')(): |
| page_titles.append(page.title) |
| return dict(pages=page_titles) |
| |
| @expose() |
| def _lookup(self, title, *remainder): |
| return PageRestController(title), remainder |
| |
| |
| class PageRestController(BaseController): |
| |
| def __init__(self, title): |
| self.title = h.really_unicode(unquote(title)) if title else None |
| self.page = WM.Page.query.get(app_config_id=c.app.config._id, |
| title=self.title, |
| deleted=False) |
| |
| def _check_security(self): |
| if self.page: |
| require_access(self.page, 'read') |
| if self.page.deleted: |
| require_access(self.page, 'delete') |
| |
| @h.vardec |
| @expose('json:') |
| def index(self, **kw): |
| if request.method == 'POST': |
| return self._update_page(self.title, **kw) |
| if self.page is None: |
| raise exc.HTTPNotFound() |
| return self.page.__json__(posts_limit=10) |
| |
| def _update_page(self, title, **post_data): |
| with h.notifications_disabled(c.project): |
| if not self.page: |
| require_access(c.app, 'create') |
| if WM.Page.is_limit_exceeded(c.app.config, user=c.user): |
| log.warn('Page create/edit rate limit exceeded. %s', |
| c.app.config.url()) |
| raise forge_exc.HTTPTooManyRequests() |
| self.page = WM.Page.upsert(title) |
| self.page.viewable_by = ['all'] |
| else: |
| require_access(self.page, 'edit') |
| self.page.text = post_data['text'] |
| if 'labels' in post_data: |
| self.page.labels = post_data['labels'].split(',') |
| self.page.commit() |
| return {} |
| |
| |
| class WikiAdminController(DefaultAdminController): |
| |
| def _check_security(self): |
| require_access(self.app, 'configure') |
| |
| @with_trailing_slash |
| def index(self, **kw): |
| redirect('home') |
| |
| @without_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/admin_home.html') |
| def home(self): |
| return dict(app=self.app, |
| home=self.app.root_page_name, |
| allow_config=has_access(self.app, 'configure')()) |
| |
| @without_trailing_slash |
| @expose('jinja:forgewiki:templates/wiki/admin_options.html') |
| def options(self): |
| return dict(app=self.app, |
| allow_config=has_access(self.app, 'configure')()) |
| |
| @without_trailing_slash |
| @expose() |
| @require_post() |
| def set_home(self, new_home): |
| old_home = self.app.root_page_name |
| self.app.root_page_name = new_home |
| self.app.upsert_root(new_home) |
| flash('Home updated') |
| mount_base = c.project.url() + \ |
| self.app.config.options.mount_point + '/' |
| url = h.really_unicode(mount_base) + h.really_unicode(new_home) + '/' |
| M.AuditLog.log('{}: set home page "{}" => "{}"'.format( |
| self.app.config.options['mount_point'], |
| old_home, |
| new_home, |
| )) |
| redirect(h.urlquote(url)) |
| |
| @without_trailing_slash |
| @expose() |
| @require_post() |
| def set_options(self, show_discussion=False, show_left_bar=False, show_right_bar=False, |
| allow_email_posting=False): |
| mount_point = self.app.config.options['mount_point'] |
| |
| if self.app.show_discussion != bool(show_discussion): |
| M.AuditLog.log('{}: set option "{}" {} => {}'.format( |
| mount_point, "Show Discussion", self.app.show_discussion, bool(show_discussion))) |
| self.app.show_discussion = show_discussion |
| |
| if self.app.show_left_bar != bool(show_left_bar): |
| M.AuditLog.log('{}: set option "{}" {} => {}'.format( |
| mount_point, "Show left Bar", self.app.show_left_bar, bool(show_left_bar))) |
| self.app.show_left_bar = show_left_bar |
| |
| if self.app.show_right_bar != bool(show_right_bar): |
| M.AuditLog.log('{}: set option "{}" {} => {}'.format( |
| mount_point, "Show metadata", self.app.show_right_bar, bool(show_right_bar))) |
| self.app.show_right_bar = show_right_bar |
| |
| if self.app.allow_email_posting != bool(allow_email_posting): |
| M.AuditLog.log('{}: set option "{}" {} => {}'.format( |
| mount_point, "Allow posting replies via email", self.app.allow_email_posting, bool(allow_email_posting))) |
| self.app.allow_email_posting = allow_email_posting |
| |
| flash('Wiki options updated') |
| redirect(six.ensure_text(request.referer or '/')) |