blob: 34b724423c3d5ce21b583399e4de9c1c06950138 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Tests for Apache(TM) Bloodhound's product environments"""
from inspect import stack
import os.path
import shutil
from sqlite3 import OperationalError
import sys
import tempfile
from types import MethodType
if sys.version_info < (2, 7):
import unittest2 as unittest
from unittest2.case import _AssertRaisesContext
else:
import unittest
from unittest.case import _AssertRaisesContext
from trac.config import Option
from trac.core import Component, ComponentMeta
from trac.env import Environment
from trac.test import EnvironmentStub, MockPerm
from trac.tests.env import EnvironmentTestCase
from trac.ticket.report import ReportModule
from trac.ticket.web_ui import TicketModule
from trac.util.text import to_unicode
from trac.web.href import Href
from multiproduct.api import MultiProductSystem
from multiproduct.env import ProductEnvironment
from multiproduct.model import Product
class ProductEnvironmentStub(ProductEnvironment):
r"""A product environment slightly tweaked for testing purposes
"""
def get_known_users(self, cnx=None):
return self.known_users
# FIXME: Subclass TestCase explictly ?
class MultiproductTestCase(unittest.TestCase):
r"""Mixin providing access to multi-product testing extensions.
This class serves to the purpose of upgrading existing Trac test cases
with multi-product super-powers while still providing the foundations
to create product-specific subclasses.
"""
# unittest2 extensions
exceptFailureMessage = None
class _AssertRaisesLoggingContext(_AssertRaisesContext):
"""Add logging capabilities to assertRaises
"""
def __init__(self, expected, test_case, expected_regexp=None):
_AssertRaisesContext.__init__(self, expected, test_case,
expected_regexp)
self.test_case = test_case
@staticmethod
def _tb_locals(tb):
if tb is None:
# Inspect interpreter stack two levels up
ns = stack()[2][0].f_locals.copy()
else:
# Traceback already in context
ns = tb.tb_frame.f_locals.copy()
ns.pop('__builtins__', None)
return ns
def __exit__(self, exc_type, exc_value, tb):
try:
return _AssertRaisesContext.__exit__(self, exc_type,
exc_value, tb)
except self.failureException, exc:
msg = self.test_case.exceptFailureMessage
if msg is not None:
standardMsg = str(exc)
msg = msg % self._tb_locals(tb)
msg = self.test_case._formatMessage(msg, standardMsg)
raise self.failureException(msg)
else:
raise
finally:
# Clear message placeholder
self.test_case.exceptFailureMessage = None
def assertRaises(self, excClass, callableObj=None, *args, **kwargs):
"""Adds logging capabilities on top of unittest2 implementation.
"""
if callableObj is None:
return self._AssertRaisesLoggingContext(excClass, self)
else:
return unittest.TestCase.assertRaises(self, excClass, callableObj,
*args, **kwargs)
# Product data
default_product = 'tp1'
MAX_TEST_PRODUCT = 3
PRODUCT_DATA = {
'tp1': {
'prefix': 'tp1',
'name': 'test product 1',
'description': 'desc for tp1',
},
'tp2': {
'prefix': 'tp2',
'name': 'test product 2',
'description': 'desc for tp2',
},
u'xü': {
'prefix': u'xü',
'name': 'Non-ASCII chars',
'description': 'Unicode chars in name',
},
u'Überflüssigkeit': {
'prefix': u'Überflüssigkeit',
'name': 'Non-ASCII chars (long)',
'description': 'Long name with unicode chars',
},
'Foo Bar': {
'prefix': 'Foo Bar',
'name': 'Whitespaces',
'description': 'Whitespace chars in name',
},
'Foo Bar#baz': {
'prefix': 'Foo Bar#baz',
'name': 'Non-alphanumeric',
'description': 'Special chars in name',
},
'pl/de': {
'prefix': 'pl/de',
'name': 'Path separator',
'description': 'URL path separator in name',
},
}
# Test setup
def _setup_test_env(self, create_folder=True, path=None, **kwargs):
r"""Prepare a new test environment .
Optionally set its path to a meaningful location (temp folder
if `path` is `None`).
"""
MultiProductSystem.FakePermClass = MockPerm
kwargs.setdefault('enable', ['trac.*', 'multiproduct.*'])
self.env = env = EnvironmentStub(**kwargs)
if create_folder:
if path is None:
env.path = tempfile.mkdtemp('bh-product-tempenv')
else:
env.path = path
if not os.path.exists(path):
os.mkdir(path)
return env
def _setup_test_log(self, env):
r"""Ensure test product with prefix is loaded
"""
logdir = tempfile.gettempdir()
logpath = os.path.join(logdir, 'trac-testing.log')
config = env.config
config.set('logging', 'log_file', logpath)
config.set('logging', 'log_type', 'file')
config.set('logging', 'log_level', 'DEBUG')
# Log SQL queries
config.set('trac', 'debug_sql', True)
config.save()
env.setup_log()
env.log.info('%s test case: %s %s', '-' * 10, self.id(), '-' * 10)
# Clean-up logger instance and associated handler
# Otherwise large test suites will only result in ERROR eventually
# (at least in Unix systems) with messages
#
# TracError: Error reading '/path/to/file', make sure it is readable.
# error: /path/to/: Too many open files
self.addCleanup(self._teardown_test_log, env)
def _teardown_test_log(self, env):
if env.log and hasattr(env, '_log_handler'):
env.log.removeHandler(env._log_handler)
env._log_handler.flush()
env._log_handler.close()
del env._log_handler
@classmethod
def _load_product_from_data(cls, env, prefix):
r"""Ensure test product with prefix is loaded
"""
# TODO: Use fixtures implemented in #314
product_data = cls.PRODUCT_DATA[prefix]
prefix = to_unicode(prefix)
product = Product(env)
product._data.update(product_data)
product.insert()
@classmethod
def _upgrade_mp(cls, env):
r"""Apply multi product upgrades
"""
# Do not break wiki parser ( see #373 )
env.disable_component(TicketModule)
env.disable_component(ReportModule)
mpsystem = MultiProductSystem(env)
try:
mpsystem.upgrade_environment(env.db_transaction)
except OperationalError:
# Database is upgraded, but database version was deleted.
# Complete the upgrade by inserting default product.
self.mpsystem._insert_default_product(env.db_transaction)
# assume that the database schema has been upgraded, enable
# multi-product schema support in environment
env.enable_multiproduct_schema(True)
@classmethod
def _load_default_data(cls, env):
r"""Initialize environment with default data by respecting
values set in system table.
"""
from trac import db_default
env.log.debug('Loading default data')
with env.db_transaction as db:
for table, cols, vals in db_default.get_data(db):
if table != 'system':
db.executemany('INSERT INTO %s (%s) VALUES (%s)'
% (table, ','.join(cols),
','.join(['%s' for c in cols])), vals)
env.log.debug('Loaded default data')
def _mp_setup(self, **kwargs):
"""Shortcut for quick product-aware environment setup.
"""
self.env = self._setup_test_env(**kwargs)
self._upgrade_mp(self.env)
self._setup_test_log(self.env)
self._load_product_from_data(self.env, self.default_product)
class ProductEnvTestCase(EnvironmentTestCase, MultiproductTestCase):
r"""Test cases for Trac environments rewritten for product environments
"""
# Test setup
def setUp(self):
r"""Replace Trac environment with product environment
"""
EnvironmentTestCase.setUp(self)
try:
self.global_env = self.env
self._setup_test_log(self.global_env)
self._upgrade_mp(self.global_env)
self._load_product_from_data(self.global_env, self.default_product)
try:
self.env = ProductEnvironment(self.global_env,
self.default_product)
except:
# All tests should fail if anything goes wrong
self.global_env.log.exception(
'Error creating product environment')
self.env = None
except:
shutil.rmtree(self.env.path)
raise
def tearDown(self):
# Discard product environment
self.env = self.global_env
EnvironmentTestCase.tearDown(self)
class ProductEnvApiTestCase(MultiproductTestCase):
"""Assertions for Apache(TM) Bloodhound product-specific extensions in
[https://issues.apache.org/bloodhound/wiki/Proposals/BEP-0003 BEP 3]
"""
def setUp(self):
self._mp_setup()
self.product_env = ProductEnvironment(self.env, self.default_product)
def tearDown(self):
# Release reference to transient environment mock object
if self.env is not None:
try:
self.env.reset_db()
except OperationalError:
# "Database not found ...",
# "OperationalError: no such table: system" or the like
pass
self.env = None
self.product_env = None
def test_attr_forward_parent(self):
"""Testing env.__getattr__"""
class EnvironmentAttrSandbox(EnvironmentStub):
"""Limit the impact of class edits so as to avoid race conditions
"""
self.longMessage = True
class AttrSuccess(Exception):
"""Exception raised when target method / property is actually
invoked.
"""
def property_mock(attrnm, expected_self):
def assertAttrFwd(instance):
self.assertIs(instance, expected_self,
"Mismatch in property '%s'" % (attrnm,))
raise AttrSuccess
return property(assertAttrFwd)
self.env.__class__ = EnvironmentAttrSandbox
try:
for attrnm in 'system_info_providers secure_cookies ' \
'project_admin_trac_url get_system_info get_version ' \
'get_templates_dir get_templates_dir get_log_dir ' \
'backup'.split():
original = getattr(Environment, attrnm)
if isinstance(original, MethodType):
translation = getattr(self.product_env, attrnm)
self.assertIs(translation.im_self, self.env,
"'%s' not bound to global env in product env"
% (attrnm,))
self.assertIs(translation.im_func, original.im_func,
"'%s' function differs in product env"
% (attrnm,))
elif isinstance(original, (property, Option)):
# Intercept property access e.g. properties, Option, ...
setattr(self.env.__class__, attrnm,
property_mock(attrnm, self.env))
self.exceptFailureMessage = 'Property %(attrnm)s'
with self.assertRaises(AttrSuccess) as cm_test_attr:
getattr(self.product_env, attrnm)
else:
self.fail("Environment member %s has unexpected type"
% (repr(original),))
finally:
self.env.__class__ = EnvironmentStub
for attrnm in 'component_activated _component_rules ' \
'enable_component get_known_users get_repository ' \
'_component_name'.split():
original = getattr(Environment, attrnm)
if isinstance(original, MethodType):
translation = getattr(self.product_env, attrnm)
self.assertIs(translation.im_self, self.product_env,
"'%s' not bound to product env" % (attrnm,))
self.assertIs(translation.im_func, original.im_func,
"'%s' function differs in product env"
% (attrnm,))
elif isinstance(original, property):
translation = getattr(ProductEnvironment, attrnm)
self.assertIs(original, translation,
"'%s' property differs in product env"
% (attrnm,))
def test_typecheck(self):
"""Testing env.__init__"""
self._load_product_from_data(self.env, 'tp2')
with self.assertRaises(TypeError) as cm_test:
new_env = ProductEnvironment(self.product_env, 'tp2')
msg = str(cm_test.exception)
expected_msg = "Initializer must be called with " \
"trac.env.Environment instance as first argument " \
"(got multiproduct.env.ProductEnvironment instance " \
"instead)"
self.assertEqual(msg, expected_msg)
def test_component_enable(self):
"""Testing env.is_component_enabled"""
class C(Component):
pass
# Let's pretend this was declared elsewhere
C.__module__ = 'dummy_module'
global_env = self.env
product_env = self.product_env
def _test_component_enabled(cls):
cname = global_env._component_name(cls)
disable_component_in_config = global_env.disable_component_in_config
enable_component_in_config = global_env.enable_component_in_config
# cls initially disabled in both envs
disable_component_in_config(global_env, cls)
disable_component_in_config(product_env, cls)
expected_rules = {
'multiproduct': True,
'trac': True,
'trac.db': True,
cname: False,
}
self.assertEquals(expected_rules, global_env._component_rules)
self.assertEquals(expected_rules, product_env._component_rules)
self.assertFalse(global_env.is_component_enabled(cls))
self.assertFalse(product_env.is_component_enabled_local(cls))
self.assertIs(global_env[cls], None)
self.assertIs(product_env[cls], None)
# cls enabled in product env but not in global env
disable_component_in_config(global_env, cls)
enable_component_in_config(product_env, cls)
expected_rules[cname] = False
self.assertEquals(expected_rules, global_env._component_rules)
expected_rules[cname] = True
self.assertEquals(expected_rules, product_env._component_rules)
self.assertFalse(global_env.is_component_enabled(cls))
self.assertTrue(product_env.is_component_enabled_local(cls))
self.assertIs(global_env[cls], None)
self.assertIs(product_env[cls], None)
# cls enabled in both envs
enable_component_in_config(global_env, cls)
enable_component_in_config(product_env, cls)
expected_rules[cname] = True
self.assertEquals(expected_rules, global_env._component_rules)
expected_rules[cname] = True
self.assertEquals(expected_rules, product_env._component_rules)
self.assertTrue(global_env.is_component_enabled(cls))
self.assertTrue(product_env.is_component_enabled_local(cls))
self.assertIsNot(global_env[cls], None)
self.assertIsNot(product_env[cls], None)
# cls enabled in global env but not in product env
enable_component_in_config(global_env, cls)
disable_component_in_config(product_env, cls)
expected_rules[cname] = True
self.assertEquals(expected_rules, global_env._component_rules)
expected_rules[cname] = False
self.assertEquals(expected_rules, product_env._component_rules)
self.assertTrue(global_env.is_component_enabled(cls))
self.assertFalse(product_env.is_component_enabled_local(cls))
self.assertIsNot(global_env[cls], None)
self.assertIs(product_env[cls], None)
# Test the rules against custom , external component
_test_component_enabled(C)
for env in (global_env, product_env):
env.config.remove('components', env._component_name(C))
# Test the rules against Trac component class
_test_component_enabled(TicketModule)
# ComponentMeta._components is shared between multiple tests.
# Unregister class C as its fake module might break something else.
ComponentMeta._components.remove(C)
def test_path(self):
"""Testing env.path"""
self.assertEqual(self.product_env.path,
os.path.join(self.env.path, 'products',
self.default_product))
def test_env_config_inheritance(self):
"""Testing env.config"""
global_config = self.env.config
product_config = self.product_env.config
# By default inherit global settings ...
global_config['section'].set('key', 'value1')
self.assertEquals('value1', global_config['section'].get('key'))
self.assertEquals('value1', product_config['section'].get('key'))
# ... but allow for overrides in product scope
product_config['section'].set('key', 'value2')
self.assertEquals('value1', global_config['section'].get('key'))
self.assertEquals('value2', product_config['section'].get('key'))
def test_parametric_singleton(self):
self.assertIs(self.product_env,
ProductEnvironment(self.env, self.default_product))
for prefix in self.PRODUCT_DATA:
if prefix != self.default_product:
self._load_product_from_data(self.env, prefix)
envgen1 = dict([prefix, ProductEnvironment(self.env, prefix)]
for prefix in self.PRODUCT_DATA)
envgen2 = dict([prefix, ProductEnvironment(self.env, prefix)]
for prefix in self.PRODUCT_DATA)
for prefix, env1 in envgen1.iteritems():
self.assertIs(env1, envgen2[prefix],
"Identity check (by prefix) '%s'" % (prefix,))
for prefix, env1 in envgen1.iteritems():
self.assertIs(env1, envgen2[prefix],
"Identity check (by prefix) '%s'" % (prefix,))
def load_product(prefix):
products = Product.select(self.env, where={'prefix' : prefix})
if not products:
raise LookupError('Missing product %s' % (prefix,))
else:
return products[0]
envgen3 = dict([prefix, ProductEnvironment(self.env,
load_product(prefix))]
for prefix in self.PRODUCT_DATA)
for prefix, env1 in envgen1.iteritems():
self.assertIs(env1, envgen3[prefix],
"Identity check (by product model) '%s'" % (prefix,))
class ProductEnvHrefTestCase(MultiproductTestCase):
"""Assertions for resolution of product environment's base URL
[https://issues.apache.org/bloodhound/wiki/Proposals/BEP-0003 BEP 3]
"""
def product_base_url(url_template):
def decorator(f):
f.product_base_url = url_template
return f
return decorator
def setUp(self):
self._mp_setup()
self.env.path = '/path/to/env'
self.env.abs_href = Href('http://globalenv.com/trac.cgi')
url_pattern = getattr(getattr(self, self._testMethodName).im_func,
'product_base_url', '')
self.env.config.set('multiproduct', 'product_base_url', url_pattern)
self.env.config.set('trac', 'base_url', 'http://globalenv.com/trac.cgi')
self.product_env = ProductEnvironment(self.env, self.default_product)
def tearDown(self):
# Release reference to transient environment mock object
if self.env is not None:
try:
self.env.reset_db()
except OperationalError:
# "Database not found ...",
# "OperationalError: no such table: system" or the like
pass
self.env = None
self.product_env = None
@product_base_url('http://$(prefix)s.domain.tld/')
def test_href_subdomain(self):
"""Test product sub domain base URL
"""
self.assertEqual('http://tp1.domain.tld', self.product_env.abs_href())
@product_base_url('/path/to/bloodhound/$(prefix)s')
def test_href_sibling_paths(self):
"""Test product base URL at sibling paths
"""
self.assertEqual('http://globalenv.com/trac.cgi/path/to/bloodhound/tp1',
self.product_env.abs_href())
@product_base_url('/$(envname)s/$(prefix)s')
def test_href_inherit_sibling_paths(self):
"""Test product base URL at sibling paths inheriting configuration.
"""
self.assertEqual('http://globalenv.com/trac.cgi/env/tp1',
self.product_env.abs_href())
@product_base_url('')
def test_href_default(self):
"""Test product base URL is to a default
"""
self.assertEqual('http://globalenv.com/trac.cgi/products/tp1',
self.product_env.abs_href())
@product_base_url('/products/$(prefix)s')
def test_href_embed(self):
"""Test default product base URL /products/prefix
"""
self.assertEqual('http://globalenv.com/trac.cgi/products/tp1',
self.product_env.abs_href())
@product_base_url('http://$(envname)s.tld/bh/$(prefix)s')
def test_href_complex(self):
"""Test complex product base URL
"""
self.assertEqual('http://env.tld/bh/tp1', self.product_env.abs_href())
@product_base_url('http://$(prefix)s.$(envname)s.tld/')
def test_product_href_uses_multiproduct_product_base_url(self):
"""Test that [multiproduct] product_base_url is used to compute
abs_href for the product environment when [trac] base_url for
the product environment is an empty string (the default).
"""
# Global URLs
self.assertEqual('http://globalenv.com/trac.cgi', self.env.base_url)
self.assertEqual('http://globalenv.com/trac.cgi', self.env.abs_href())
# Product URLs
self.assertEqual('', self.product_env.base_url)
self.assertEqual('http://tp1.env.tld', self.product_env.abs_href())
@product_base_url('http://$(prefix)s.$(envname)s.tld/')
def test_product_href_uses_products_base_url(self):
"""Test that [trac] base_url for the product environment is used to
compute abs_href for the product environment when [trac] base_url
for the product environment is different than [trac] base_url for
the global environment.
"""
self.product_env.config.set('trac', 'base_url', 'http://productenv.com')
self.product_env.config.save()
self.assertEqual('http://productenv.com', self.product_env.base_url)
self.assertEqual('http://productenv.com', self.product_env.abs_href())
@product_base_url('http://$(prefix)s.$(envname)s.tld/')
def test_product_href_global_and_product_base_urls_same(self):
"""Test that [multiproduct] product_base_url is used to compute
abs_href for the product environment when [trac] base_url is the same
for the product and global environment.
"""
self.product_env.config.set('trac', 'base_url',
self.env.config.get('trac', 'base_url'))
self.product_env.config.save()
self.assertEqual('', self.product_env.base_url)
self.assertEqual('http://tp1.env.tld', self.product_env.abs_href())
product_base_url = staticmethod(product_base_url)
def test_suite():
return unittest.TestSuite([
unittest.makeSuite(ProductEnvTestCase, 'test'),
unittest.makeSuite(ProductEnvApiTestCase, 'test'),
unittest.makeSuite(ProductEnvHrefTestCase, 'test'),
])
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')