blob: 34854ad0f01ab134665f75e407f0c11a0f54ad95 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import sys
import re
from functools import wraps
import contextlib
from six.moves.urllib.parse import parse_qs
from import assert_equal
from ming.orm.ormsession import ThreadLocalORMSession
from tg import tmpl_context as c
from mock import patch
import tg
from paste.deploy.converters import asbool
from allura import model as M
import allura.config.middleware
def with_user_project(username):
def _with_user_project(func):
def wrapped(*args, **kw):
user = M.User.by_username(username)
c.user = user
n = M.Neighborhood.query.get(name='Users')
shortname = 'u/' + username
p = M.Project.query.get(shortname=shortname, neighborhood_id=n._id)
if not p:
n.register_project(shortname, user=user, user_project=True)
return func(*args, **kw)
return wrapped
return _with_user_project
def NullContextManager():
def with_tool(project_shortname, ep_name, mount_point=None, mount_label=None,
ordinal=None, post_install_hook=None, username='test-admin',
def _with_tool(func):
def wrapped(*args, **kw):
c.user = M.User.by_username(username)
p = M.Project.query.get(shortname=project_shortname)
c.project = p
if mount_point and not p.app_instance(mount_point): = p.install_app(
ep_name, mount_point, mount_label, ordinal, **override_options)
if post_install_hook:
if asbool(tg.config.get('smtp.mock')):
smtp_mock = patch('allura.lib.mail_util.smtplib.SMTP')
smtp_mock = NullContextManager()
with smtp_mock:
while M.MonQTask.run_ready('setup'):
elif mount_point: = p.app_instance(mount_point)
return func(*args, **kw)
return wrapped
return _with_tool
with_discussion = with_tool('test', 'Discussion', 'discussion')
with_link = with_tool('test', 'Link', 'link')
with_tracker = with_tool('test', 'Tickets', 'bugs')
with_wiki = with_tool('test', 'Wiki', 'wiki')
with_url = with_tool('test', 'ShortUrl', 'url')
class raises:
Test helper in the form of a context manager, to assert that something raises an exception.
After completion, the 'exc' attribute can be used to do further inspection of the exception
def __init__(self, ExcType):
self.ExcType = ExcType
self.exc = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_t):
if exc_type:
self.exc = exc_val
if issubclass(exc_type, self.ExcType):
# ok
return True
# root exception will be raised, untouched
return False
raise AssertionError('Did not raise %s' % self.ExcType)
def without_module(*module_names):
def _without_module(func):
def wrapped(*a, **kw):
with patch.dict(sys.modules, {m: None for m in module_names}):
return func(*a, **kw)
return wrapped
return _without_module
class patch_middleware_config:
Context manager that patches the configuration used during middleware
setup for Allura
def __init__(self, new_configs):
self.new_configs = new_configs
def __enter__(self):
self._make_app = allura.config.middleware.make_app
def make_app(global_conf, full_stack=True, **app_conf):
return self._make_app(global_conf, full_stack, **app_conf)
allura.config.middleware.make_app = make_app
return self
def __exit__(self, exc_type, exc_val, exc_t):
allura.config.middleware.make_app = self._make_app
def audits(*messages, **kwargs):
Asserts all the messages exist in audit log
:param messages: regex strings
:param bool user: if this is a user log
if kwargs.get('user'):
actor = kwargs.get('actor', '.*')
ip_addr = kwargs.get('ip_addr', '.*')
user_agent = kwargs.get('user_agent', '.*')
preamble = f'(Done by user: {actor}\n)?IP Address: {ip_addr}\nUser-Agent: {user_agent}\n'
preamble = ''
for message in messages:
found = M.AuditLog.query.find(dict(message=re.compile(preamble + message))).count()
if not found:
hints = ''
all = M.AuditLog.query.find().all()
if len(all) < 10:
hints += '\nin these AuditLog messages:\n\t' + '\n\t'.join(a.message for a in all)
if message != re.escape(message):
hints += '\nYou may need to escape the regex chars in the text you are matching'
raise AssertionError(f'Could not find "{message}"{hints}')
def out_audits(*messages, **kwargs):
Asserts none the messages exist in audit log. "without audits"
:param messages: list of regex strings
:param bool user: if this is a user log
if kwargs.get('user'):
actor = kwargs.get('actor', '.*')
ip_addr = kwargs.get('ip_addr', '.*')
preamble = f'(Done by user: {actor}\n)?IP Address: {ip_addr}\n'
preamble = ''
for message in messages:
assert not M.AuditLog.query.find(dict(
message=re.compile(preamble + message))).count(), 'Found unexpected: "%s"' % message
# not a decorator but use it with LogCapture() context manager
def assert_logmsg(logs, msg, maxlevel=logging.CRITICAL+1):
can also use logs.check() or logs.check_present()
:param testfixtures.logcapture.LogCapture logs: LogCapture() instance
:param str msg: Message substring to look for
found_msg = False
for r in logs.records:
if msg in r.getMessage():
found_msg = True
if r.levelno > maxlevel:
raise AssertionError(f'unexpected log {r.levelname} {r.getMessage()}')
assert found_msg, \
'Did not find "{}" in these logs: {}'.format(msg, '\n'.join([r.getMessage() for r in logs.records]))
def assert_logmsg_and_no_warnings_or_errors(logs, msg):
can also use logs.check() or logs.check_present()
:param testfixtures.logcapture.LogCapture logs: LogCapture() instance
:param str msg: Message substring to look for
return assert_logmsg(logs, msg, maxlevel=logging.INFO)
def assert_equivalent_urls(url1, url2):
base1, _, qs1 = url1.partition('?')
base2, _, qs2 = url2.partition('?')
(base1, parse_qs(qs1)),
(base2, parse_qs(qs2)),