blob: 5a131c7ab3a60acf07c03462578a841762ca285e [file] [log] [blame]
import cgi
import random
import shlex
import string
import logging
import pymongo
from pylons import c, g, request
from . import helpers as h
from . import security
log = logging.getLogger(__name__)
_macros = {}
class macro(object):
def __init__(self, context=None):
self._context = context
def __call__(self, func):
_macros[func.__name__] = (func, self._context)
return func
class parse(object):
def __init__(self, context):
self._context = context
def __call__(self, s):
try:
if s.startswith('quote '):
return '[[' + s[len('quote '):] + ']]'
try:
parts = [ unicode(x, 'utf-8') for x in shlex.split(s.encode('utf-8')) ]
if not parts: return '[[' + s + ']]'
macro = self._lookup_macro(parts[0])
if not macro: return '[[' + s + ']]'
for t in parts[1:]:
if '=' not in t:
return '[-%s: missing =-]' % ' '.join(parts)
args = dict(t.split('=', 1) for t in parts[1:])
response = macro(**h.encode_keys(args))
return response
except (ValueError, TypeError), ex:
msg = cgi.escape(u'[[%s]] (%s)' % (s, repr(ex)))
return '\n<div class="error"><pre><code>%s</code></pre></div>' % msg
except Exception, ex:
raise
return '[[Error parsing %s: %s]]' % (s, ex)
def _lookup_macro(self, s):
macro, context = _macros.get(s, None)
if context is None or context == self._context:
return macro
else:
return None
template_neighborhood_feeds = string.Template('''
<div class="neighborhood_feed_entry">
<h3><a href="$href">$title</a></h3>
<p>
by <em>$author</em>
<small>$ago</small>
</p>
<p>$description</p>
</div>
''')
@macro('neighborhood-wiki')
def neighborhood_feeds(tool_name, max_number=5, sort='pubdate'):
from allura import model as M
feed = M.Feed.query.find(
dict(
tool_name=tool_name,
neighborhood_id=c.project.neighborhood._id))
feed = feed.sort(sort, pymongo.DESCENDING).limit(int(max_number)).all()
output = '\n'.join(
template_neighborhood_feeds.substitute(dict(
href=item.link,
title=item.title,
author=item.author_name,
ago=h.ago(item.pubdate),
description=item.description))
for item in feed)
return output
template_neighborhood_blog_posts = string.Template('''
<div class="neighborhood_feed_entry">
<h3><a href="$href">$title</a></h3>
<p>
by <em>$author</em>
<small>$ago</small>
</p>
$description
</div>
''')
@macro('neighborhood-wiki')
def neighborhood_blog_posts(max_number=5, sort='timestamp', summary=False):
from forgeblog import model as BM
posts = BM.BlogPost.query.find(dict(
neighborhood_id=c.project.neighborhood._id,
state='published'))
posts = posts.sort(sort, pymongo.DESCENDING).limit(int(max_number)).all()
output = '\n'.join(
template_neighborhood_blog_posts.substitute(dict(
href=post.url(),
title=post.title,
author=post.author().display_name,
ago=h.ago(post.timestamp),
description=summary and '&nbsp;' or g.markdown.convert(post.text)))
for post in posts if security.has_access(post, 'read', project=post.app.project)() and
security.has_access(post.app.project, 'read', project=post.app.project)())
return output
@macro()
def project_blog_posts(max_number=5, sort='timestamp', summary=False, mount_point=None):
from forgeblog import model as BM
app_config_ids = []
for conf in c.project.app_configs:
if conf.tool_name.lower() == 'blog' and (mount_point is None or conf.options.mount_point==mount_point):
app_config_ids.append(conf._id)
posts = BM.BlogPost.query.find({'state':'published','app_config_id':{'$in':app_config_ids}})
posts = posts.sort(sort, pymongo.DESCENDING).limit(int(max_number)).all()
output = '\n'.join(
template_neighborhood_blog_posts.substitute(dict(
href=post.url(),
title=post.title,
author=post.author().display_name,
ago=h.ago(post.timestamp),
description=summary and '&nbsp;' or g.markdown.convert(post.text)))
for post in posts if security.has_access(post, 'read', project=post.app.project)() and
security.has_access(post.app.project, 'read', project=post.app.project)())
return output
def get_projects_for_macro(category=None, display_mode='grid', sort='last_updated',
show_total=False, limit=100, labels='', award='', private=False,
columns=1, show_proj_icon=True, show_download_button=True, show_awards_banner=True,
initial_q={}):
from allura.lib.widgets.project_list import ProjectList
from allura.lib import utils
from allura import model as M
# 'trove' is internal substitution for 'category' filter in wiki macro
trove = category
limit = int(limit)
q = dict(
deleted=False,
shortname={'$ne':'--init--'})
q.update(initial_q)
if labels:
or_labels = labels.split('|')
q['$or'] = [{'labels': {'$all': l.split(',')}} for l in or_labels]
if trove is not None:
trove = M.TroveCategory.query.get(fullpath=trove)
if award:
aw = M.Award.query.find(dict(
created_by_neighborhood_id=c.project.neighborhood_id,
short=award)).first()
if aw:
ids = [grant.granted_to_project_id for grant in
M.AwardGrant.query.find(dict(
granted_by_neighborhood_id=c.project.neighborhood_id,
award_id=aw._id))]
if '_id' in q:
ids = list(set(q['_id']['$in']).intersection(ids))
q['_id'] = {'$in': ids}
if trove is not None:
q['trove_' + trove.type] = trove._id
sort_key, sort_dir = 'last_updated', pymongo.DESCENDING
if sort == 'alpha':
sort_key, sort_dir = 'name', pymongo.ASCENDING
elif sort == 'random':
sort_key, sort_dir = None, None
elif sort == 'last_registered':
sort_key, sort_dir = '_id', pymongo.DESCENDING
elif sort == '_id':
sort_key, sort_dir = '_id', pymongo.DESCENDING
projects = []
if private:
# Only return private projects.
# Can't filter these with a mongo query directly - have to iterate
# through and check the ACL of each project.
for chunk in utils.chunked_find(M.Project, q, sort_key=sort_key,
sort_dir=sort_dir):
projects.extend([p for p in chunk if p.private])
total = len(projects)
if sort == 'random':
projects = random.sample(projects, min(limit, total))
else:
projects = projects[:limit]
else:
total = None
if sort == 'random':
# MongoDB doesn't have a random sort built in, so...
# 1. Do a direct pymongo query (faster than ORM) to fetch just the
# _ids of objects that match our criteria
# 2. Choose a random sample of those _ids
# 3. Do an ORM query to fetch the objects with those _ids
# 4. Shuffle the results
from ming.orm import mapper
m = mapper(M.Project)
collection = M.main_doc_session.db[m.collection.m.collection_name]
docs = list(collection.find(q, {'_id': 1}))
if docs:
ids = [doc['_id'] for doc in
random.sample(docs, min(limit, len(docs)))]
if '_id' in q:
ids = list(set(q['_id']['$in']).intersection(ids))
q['_id'] = {'$in': ids}
projects = M.Project.query.find(q).all()
random.shuffle(projects)
else:
projects = M.Project.query.find(q).limit(limit).sort(sort_key,
sort_dir).all()
pl = ProjectList()
g.resource_manager.register(pl)
response = pl.display(projects=projects, display_mode=display_mode,
columns=columns, show_proj_icon=show_proj_icon,
show_download_button=show_download_button,
show_awards_banner=show_awards_banner)
if show_total:
if total is None:
total = 0
for p in M.Project.query.find(q):
if h.has_access(p, 'read')():
total = total + 1
response = '<p class="macro_projects_total">%s Projects</p>%s' % \
(total, response)
return response
@macro('neighborhood-wiki')
def projects(category=None, display_mode='grid', sort='last_updated',
show_total=False, limit=100, labels='', award='', private=False,
columns=1, show_proj_icon=True, show_download_button=True, show_awards_banner=True):
initial_q = dict(neighborhood_id=c.project.neighborhood_id)
return get_projects_for_macro(category=category, display_mode=display_mode, sort=sort,
show_total=show_total, limit=limit, labels=labels, award=award, private=private,
columns=columns, show_proj_icon=show_proj_icon, show_download_button=show_download_button,
show_awards_banner=show_awards_banner,
initial_q=initial_q)
@macro('userproject-wiki')
def my_projects(category=None, display_mode='grid', sort='last_updated',
show_total=False, limit=100, labels='', award='', private=False,
columns=1, show_proj_icon=True, show_download_button=True, show_awards_banner=True):
myproj_user = c.project.user_project_of
if myproj_user is None:
myproj_user = c.user.anonymous()
ids = []
for p in myproj_user.my_projects():
ids.append(p._id)
initial_q = dict(_id={'$in': ids})
return get_projects_for_macro(category=category, display_mode=display_mode, sort=sort,
show_total=show_total, limit=limit, labels=labels, award=award, private=private,
columns=columns, show_proj_icon=show_proj_icon, show_download_button=show_download_button,
show_awards_banner=show_awards_banner,
initial_q=initial_q)
@macro()
def project_screenshots():
from allura.lib.widgets.project_list import ProjectScreenshots
ps = ProjectScreenshots()
g.resource_manager.register(ps)
response = ps.display(project=c.project)
return response
@macro()
def download_button():
from allura import model as M
from allura.lib.widgets.macros import DownloadButton
button = DownloadButton(project=c.project)
g.resource_manager.register(button)
response = button.display(project=c.project)
return response
@macro()
def include(ref=None, **kw):
from allura import model as M
from allura.lib.widgets.macros import Include
if ref is None:
return '[-include-]'
link = M.Shortlink.lookup(ref)
if not link:
return '[[include %s (not found)]]' % ref
artifact = link.ref.artifact
if artifact is None:
return '[[include (artifact not found)]]' % ref
included = request.environ.setdefault('allura.macro.included', set())
if artifact in included:
return '[[include %s (already included)]' % ref
else:
included.add(artifact)
sb = Include()
g.resource_manager.register(sb)
response = sb.display(artifact=artifact, attrs=kw)
return response
@macro()
def img(src=None, **kw):
attrs = ('%s="%s"' % t for t in kw.iteritems())
included = request.environ.setdefault('allura.macro.att_embedded', set())
included.add(src)
if '://' in src:
return '<img src="%s" %s/>' % (src, ' '.join(attrs))
else:
return '<img src="./attachment/%s" %s/>' % (src, ' '.join(attrs))
template_project_admins = string.Template('<a href="$url">$name</a><br/>')
@macro()
def project_admins():
from allura import model as M
output = ''
admin_role = M.ProjectRole.query.get(project_id=c.project._id,name='Admin')
if admin_role:
output = '\n'.join(
template_project_admins.substitute(dict(
url=user_role.user.url(),
name=user_role.user.display_name))
for user_role in admin_role.users_with_role())
return output