import inspect
import sys
import json
import logging
import six
from http.cookies import SimpleCookie as Cookie
from collections import defaultdict
from six.moves.urllib.parse import unquote
from datetime import datetime
from datetime import timedelta
from decorator import decorator
import wrapt
from paste.deploy.converters import asint
from tg.decorators import before_validate
from tg import request, redirect, session, config
from tg.render import render
from webob import exc
from tg import tmpl_context as c
from tg import response
from webob.exc import HTTPFound, WSGIHTTPException
from allura.lib import helpers as h
from allura.lib import utils
log = logging.getLogger(__name__)
def task(*args, **kw):
"""Decorator that adds a ``.post()`` function to the decorated callable.
Calling ``<original_callable>.post(*args, **kw)`` queues the callable for
execution by a background worker process. All parameters must be
Example usage::
def myfunc():
def myotherfunc():
# No email notifications will be sent for c.project during this task
def task_(func):
def post(*args, **kwargs):
delay = kwargs.pop('delay', 0)
flush_immediately = kwargs.pop('flush_immediately', True)
project = getattr(c, 'project', None)
cm = (h.notifications_disabled if project and
kw.get('notifications_disabled') else h.null_contextmanager)
with cm(project):
from allura import model as M
return, args, kwargs, delay=delay, flush_immediately=flush_immediately)
# if decorating a class, have to make it a staticmethod
# or it gets a spurious cls argument = staticmethod(post) if inspect.isclass(func) else post
return func
if len(args) == 1 and callable(args[0]):
return task_(args[0])
return task_
class event_handler:
'''Decorator to register event handlers'''
listeners = defaultdict(set)
def __init__(self, *topics):
self.topics = topics
def __call__(self, func):
for t in self.topics:
return func
class require_post:
A decorator to require controllers by accessed with a POST only. Use whenever data will be modified by a
controller, since that's what POST is good for. We have CSRF protection middleware on POSTs, too.
def __init__(self, redir=None):
self.redir = redir
def __call__(self, func):
def check_method(remainder, params):
if request.method != 'POST':
if self.redir is not None:
raise exc.HTTPMethodNotAllowed(headers={'Allow': 'POST'})
return func
def reconfirm_auth(func, *args, **kwargs):
A decorator to require the user to reconfirm their login. Useful for sensitive pages.
from allura.lib.plugin import AuthenticationProvider
if request.POST.get('password'):
if AuthenticationProvider.get(request).validate_password(c.user, request.POST['password']):
session['auth-reconfirmed'] = datetime.utcnow()
kwargs.pop('password', None)
c.form_errors['password'] = 'Invalid password.'
allowed_timedelta = timedelta(seconds=asint(config.get('auth.reconfirm.seconds', 60)))
last_reconfirm = session.get('auth-reconfirmed', datetime.min)
if datetime.utcnow() - last_reconfirm <= allowed_timedelta:
return func(*args, **kwargs)
return render({}, 'jinja', "allura:templates/reconfirm_auth.html")
def getattr_(obj, name, default_thunk):
"Similar to .setdefault in dictionaries."
return getattr(obj, name)
except AttributeError:
default = default_thunk()
setattr(obj, name, default)
return default
def memoize(func, instance, args, kwargs):
Cache the method's result, for the given args
if instance is None:
# decorating a simple function
dic = getattr_(func, "_memoize_dic", dict)
# decorating a method
dic = getattr_(instance, f"_memoize_dic__{func.__name__}", dict)
cache_key = (args, frozenset(list(kwargs.items())))
if cache_key in dic:
return dic[cache_key]
result = func(*args, **kwargs)
dic[cache_key] = result
return result
def memoize_cleanup(obj):
Remove any _memoize_dic_* keys that were created by @memoize on methods
attrs = obj.keys() if hasattr(obj, 'keys') else obj.__dir__()
for k in list(attrs):
if k.startswith('_memoize_dic'):
del obj[k]
except TypeError:
delattr(obj, k)
def memorable_forget():
Decorator to mark a controller action as needing to "forget" remembered input values on the next
page render, if we detect that the form post was processed successfully
def _ok_to_forget(response, controller_result, raised):
Look for signals that say it's probably ok to forget remembered inputs for the current form.
Checks here will need to be expanded for controller actions that behave differently
than others upon successful processing of their particular request
# if there is a flash message with status "ok", then we can forget. If it is "error" we cannot.
if response.headers:
cookies = Cookie(response.headers.get('Set-Cookie', ''))
if cookies and 'webflash' in cookies:
webflash_value = json.loads(unquote(cookies['webflash'].value))
if webflash_value['status'] == 'ok':
return True
elif webflash_value['status'] == 'error':
return False
# if the controller raised a 302, we can assume the value will be remembered by the app
# if needed, and forget.
if raised and isinstance(raised, HTTPFound):
return True
return False
def forget(controller_result, raised=None):
Check if the form's inputs can be forgotten, and set the cookie to forget if so.
:param res: the result of the controller action
:param raised: any error (redirect or exception) raised by the controller action
if _ok_to_forget(response, controller_result, raised):
response.set_cookie('memorable_forget', request.path, secure=request.environ['beaker.session'].secure)
def _inner(func, *args, **kwargs):
res, raised = (None, None)
res = func(*args, **kwargs)
return res
except WSGIHTTPException as ex:
forget(None, ex)
raise ex
return _inner