blob: 0789bfac1cacac0ca563cbffa8e5f83060786a33 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from sqlite3 import OperationalError
from contextlib import contextmanager
import os
import shutil
import sys
import tempfile
import uuid
if sys.version_info < (2, 7):
import unittest2 as unittest
else:
import unittest
from trac.attachment import Attachment, AttachmentAdmin
from trac.core import Component, implements
from trac.db import DatabaseManager
from trac.db.schema import Table, Column
from trac.env import IEnvironmentSetupParticipant
from trac.test import Environment
from trac.ticket import Ticket
from trac.wiki import WikiPage
from multiproduct.api import MultiProductSystem
from multiproduct.env import ProductEnvironment
from multiproduct.model import Product
BLOODHOUND_TABLES = (
'bloodhound_product',
'bloodhound_productconfig',
'bloodhound_productresourcemap',
)
TABLES_WITH_PRODUCT_FIELD = (
'ticket', 'ticket_change', 'ticket_custom', 'attachment', 'component',
'milestone', 'wiki', 'report',
'version', 'enum', 'permission', 'system',
)
class EnvironmentUpgradeTestCase(unittest.TestCase):
def setUp(self, options=()):
self.env_path = tempfile.mkdtemp('multiproduct-tempenv')
self.env = Environment(self.env_path, create=True, options=options)
DummyPlugin.version = 1
def tearDown(self):
shutil.rmtree(self.env_path)
def test_can_upgrade_environment_with_multi_product_disabled(self):
self.env.upgrade()
# Multiproduct was not enabled so multiproduct tables should not exist
for table in BLOODHOUND_TABLES:
with self.assertFailsWithMissingTable():
self.env.db_direct_query("SELECT * FROM %s" % table)
for table in TABLES_WITH_PRODUCT_FIELD:
with self.assertFailsWithMissingColumn():
self.env.db_direct_query("SELECT product FROM %s" % table)
def test_upgrade_creates_multi_product_tables_and_adds_product_column(self):
self._enable_multiproduct()
self.env.upgrade()
with self.env.db_direct_transaction as db:
for table in BLOODHOUND_TABLES:
db("SELECT * FROM %s" % table)
for table in TABLES_WITH_PRODUCT_FIELD:
db("SELECT product FROM %s" % table)
def test_upgrade_creates_default_product(self):
self._enable_multiproduct()
self.env.upgrade()
products = Product.select(self.env)
self.assertEqual(len(products), 1)
def test_upgrade_moves_tickets_and_related_objects_to_default_prod(self):
self._add_custom_field('custom_field')
with self.env.db_direct_transaction as db:
db("""INSERT INTO ticket (id) VALUES (1)""")
db("""INSERT INTO attachment (type, id, filename)
VALUES ('ticket', '1', '')""")
db("""INSERT INTO ticket_custom (ticket, name, value)
VALUES (1, 'custom_field', '42')""")
db("""INSERT INTO ticket_change (ticket, time, field)
VALUES (1, 42, 'summary')""")
self._enable_multiproduct()
self.env.upgrade()
with self.product('@'):
ticket = Ticket(self.env, 1)
attachments = list(Attachment.select(self.env,
ticket.resource.realm,
ticket.resource.id))
self.assertEqual(len(attachments), 1)
self.assertEqual(ticket['custom_field'], '42')
changes = ticket.get_changelog()
self.assertEqual(len(changes), 3)
def test_upgrade_moves_custom_wikis_to_default_product(self):
with self.env.db_direct_transaction as db:
db("""INSERT INTO wiki (name, version) VALUES ('MyPage', 1)""")
db("""INSERT INTO attachment (type, id, filename)
VALUES ('wiki', 'MyPage', '')""")
self._enable_multiproduct()
self.env.upgrade()
with self.env.db_direct_transaction as db:
self.assertEqual(
len(db("""SELECT * FROM wiki WHERE product='@'""")), 1)
self.assertEqual(
len(db("""SELECT * FROM attachment
WHERE product='@'
AND type='wiki'""")), 1)
def test_upgrade_moves_system_wikis_to_products(self):
with self.env.db_direct_transaction as db:
db("""INSERT INTO wiki (name, version) VALUES ('WikiStart', 1)""")
db("""INSERT INTO attachment (type, id, filename)
VALUES ('wiki', 'WikiStart', '')""")
self._enable_multiproduct()
self.env.upgrade()
with self.env.db_direct_transaction as db:
self.assertEqual(
len(db("""SELECT * FROM wiki WHERE product='@'""")), 1)
self.assertEqual(
len(db("""SELECT * FROM attachment
WHERE product='@'
AND type='wiki'""")), 1)
self.assertEqual(
len(db("""SELECT * FROM wiki WHERE product=''""")), 0)
self.assertEqual(
len(db("""SELECT * FROM attachment
WHERE product=''
AND type='wiki'""")), 0)
def test_upgrade_copies_content_of_system_tables_to_all_products(self):
mp = MultiProductSystem(self.env)
with self.env.db_direct_transaction as db:
mp._add_column_product_to_ticket(db)
mp._create_multiproduct_tables(db)
mp._update_db_version(db, 1)
for i in range(1, 6):
db("""INSERT INTO bloodhound_product (prefix, name)
VALUES ('p%d', 'Product 1')""" % i)
for table in ('component', 'milestone', 'enum', 'version',
'permission', 'report'):
db("""DELETE FROM %s""" % table)
db("""INSERT INTO component (name) VALUES ('foobar')""")
db("""INSERT INTO milestone (name) VALUES ('foobar')""")
db("""INSERT INTO version (name) VALUES ('foobar')""")
db("""INSERT INTO enum (type, name) VALUES ('a', 'b')""")
db("""INSERT INTO permission VALUES ('x', 'TICKET_VIEW')""")
db("""INSERT INTO report (title) VALUES ('x')""")
self._enable_multiproduct()
self.env.upgrade()
with self.env.db_direct_transaction as db:
for table in ('component', 'milestone', 'version', 'enum',
'report'):
rows = db("SELECT * FROM %s" % table)
self.assertEqual(
len(rows), 6,
"Wrong number of lines in %s (%d instead of %d)\n%s"
% (table, len(rows), 6, rows))
for table in ('permission',):
# Permissions also hold rows for global product.
rows = db("SELECT * FROM %s WHERE username='x'" % table)
self.assertEqual(
len(rows), 7,
"Wrong number of lines in %s (%d instead of %d)\n%s"
% (table, len(rows), 7, rows))
def test_upgrading_database_moves_attachment_to_correct_product(self):
ticket = self.insert_ticket('ticket')
wiki = self.insert_wiki('MyWiki')
attachment = self._create_file_with_content('Hello World!')
self.add_attachment(ticket.resource, attachment)
self.add_attachment(wiki.resource, attachment)
self._enable_multiproduct()
self.env.upgrade()
with self.product('@'):
attachments = list(
Attachment.select(self.env, 'ticket', ticket.id))
attachments.extend(
Attachment.select(self.env, 'wiki', wiki.name))
self.assertEqual(len(attachments), 2)
for attachment in attachments:
self.assertEqual(attachment.open().read(), 'Hello World!')
def test_can_upgrade_database_with_ticket_attachment_with_text_ids(self):
with self.env.db_direct_transaction as db:
db("""INSERT INTO attachment (id, type, filename)
VALUES ('abc', 'ticket', '')""")
self._enable_multiproduct()
self.env.upgrade()
def test_can_upgrade_database_with_orphaned_attachments(self):
with self.env.db_direct_transaction as db:
db("""INSERT INTO attachment (id, type, filename)
VALUES ('5', 'ticket', '')""")
db("""INSERT INTO attachment (id, type, filename)
VALUES ('MyWiki', 'wiki', '')""")
self._enable_multiproduct()
self.env.upgrade()
def test_can_upgrade_multi_product_from_v1(self):
mp = MultiProductSystem(self.env)
with self.env.db_direct_transaction as db:
mp._add_column_product_to_ticket(db)
mp._create_multiproduct_tables(db)
mp._update_db_version(db, 1)
db("""INSERT INTO bloodhound_product (prefix, name)
VALUES ('p1', 'Product 1')""")
db("""INSERT INTO ticket (id, product)
VALUES (1, 'Product 1')""")
self._enable_multiproduct()
self.env.upgrade()
with self.product('p1'):
Ticket(self.env, 1)
def test_can_upgrade_multi_product_from_v2(self):
mp = MultiProductSystem(self.env)
with self.env.db_direct_transaction as db:
mp._add_column_product_to_ticket(db)
mp._create_multiproduct_tables(db)
mp._replace_product_on_ticket_with_product_prefix(db)
mp._update_db_version(db, 2)
db("""INSERT INTO bloodhound_product (prefix, name)
VALUES ('p1', 'Product 1')""")
db("""INSERT INTO ticket (id, product)
VALUES (1, 'p1')""")
db("""INSERT INTO ticket (id)
VALUES (2)""")
self._enable_multiproduct()
self.env.upgrade()
with self.product('p1'):
Ticket(self.env, 1)
with self.product('@'):
Ticket(self.env, 2)
def test_upgrade_plugin(self):
self._enable_component(DummyPlugin)
self.env.upgrade()
with self.env.db_direct_transaction as db:
db("SELECT v1 FROM dummy_table")
with self.assertFailsWithMissingColumn():
db("SELECT v2 FROM dummy_table")
DummyPlugin.version = 2
self.env.upgrade()
with self.env.db_direct_transaction as db:
db("SELECT v2 FROM dummy_table")
def test_upgrade_plugin_to_multiproduct(self):
self._enable_multiproduct()
self._enable_component(DummyPlugin)
self.env.upgrade()
with self.env.db_direct_transaction as db:
db("SELECT * FROM dummy_table")
db("""SELECT * FROM "@_dummy_table" """)
def test_upgrade_existing_plugin_to_multiproduct(self):
self._enable_component(DummyPlugin)
self.env.upgrade()
with self.env.db_direct_transaction as db:
with self.assertFailsWithMissingTable():
db("""SELECT * FROM "@_dummy_table" """)
self._enable_multiproduct()
self.env.upgrade()
with self.env.db_direct_transaction as db:
db("SELECT * FROM dummy_table")
db("""SELECT * FROM "@_dummy_table" """)
def test_upgrading_existing_plugin_leaves_data_in_global_env(self):
DummyPlugin.version = 2
self._enable_component(DummyPlugin)
self.env.upgrade()
with self.env.db_direct_transaction as db:
for i in range(5):
db("INSERT INTO dummy_table (v1) VALUES ('%d')" % i)
self.assertEqual(
len(db("SELECT * FROM dummy_table")), 5)
self._enable_multiproduct()
self.env.upgrade()
with self.env.db_direct_transaction as db:
self.assertEqual(
len(db('SELECT * FROM "dummy_table"')), 5)
self.assertEqual(
len(db('SELECT * FROM "@_dummy_table"')), 0)
def test_creating_new_product_calls_environment_created(self):
self._enable_component(DummyPlugin)
self._enable_multiproduct()
self.env.upgrade()
prod = Product(self.env)
prod.update_field_dict(dict(prefix='p1'))
ProductEnvironment(self.env, prod, create=True)
with self.env.db_direct_transaction as db:
db('SELECT * FROM "p1_dummy_table"')
def test_migrating_to_multiproduct_with_custom_default_prefix(self):
ticket = self.insert_ticket('ticket')
self.env.config.set('multiproduct', 'default_product_prefix', 'xxx')
self._enable_multiproduct()
self.env.upgrade()
products = Product.select(self.env)
self.assertEqual(len(products), 1)
self.assertEqual(products[0].prefix, 'xxx')
def _enable_multiproduct(self):
self._update_config('components', 'multiproduct.*', 'enabled')
def _add_custom_field(self, field_name):
self._update_config('ticket-custom', field_name, 'text')
def _enable_component(self, cls):
self._update_config(
'components',
'%s.%s' % (cls.__module__, cls.__name__),
'enabled'
)
def _update_config(self, section, key, value):
self.env.config.set(section, key, value)
self.env.config.save()
self.env = Environment(self.env_path)
def _create_file_with_content(self, content):
filename = str(uuid.uuid4())[:6]
path = os.path.join(self.env_path, filename)
with open(path, 'wb') as f:
f.write(content)
return path
@contextmanager
def assertFailsWithMissingTable(self):
with self.assertRaises(OperationalError) as cm:
yield
self.assertIn('no such table', str(cm.exception))
@contextmanager
def assertFailsWithMissingColumn(self):
with self.assertRaises(OperationalError) as cm:
yield
self.assertIn('no such column', str(cm.exception))
def create_ticket(self, summary, **kw):
ticket = Ticket(self.env)
ticket["summary"] = summary
for k, v in kw.items():
ticket[k] = v
return ticket
def insert_ticket(self, summary, **kw):
"""Helper for inserting a ticket into the database"""
ticket = self.create_ticket(summary, **kw)
ticket.insert()
return ticket
def create_wiki(self, name, text, **kw):
page = WikiPage(self.env, name)
page.text = text
for k, v in kw.items():
page[k] = v
return page
def insert_wiki(self, name, text = None, **kw):
text = text or "Dummy text"
page = self.create_wiki(name, text, **kw)
page.save("dummy author", "dummy comment", "::1")
return page
def add_attachment(self, resource, path):
resource = '%s:%s' % (resource.realm, resource.id)
AttachmentAdmin(self.env)._do_add(resource, path)
@contextmanager
def product(self, prefix):
old_env = self.env
self.env = ProductEnvironment(self.env, prefix)
yield
self.env = old_env
class DummyPlugin(Component):
implements(IEnvironmentSetupParticipant)
version = 1
def environment_created(self):
with self.env.db_transaction as db:
self.upgrade_environment(db)
def environment_needs_upgrade(self, db):
return self.get_version(db) < self.version
def upgrade_environment(self, db):
old_version = current_version = self.get_version(db)
db_connector, dummy = DatabaseManager(self.env)._get_connector()
while current_version < self.version:
if current_version > 0:
db("CREATE TEMPORARY TABLE dummy_table_old AS "
"SELECT * FROM dummy_table")
db("DROP TABLE dummy_table")
table = self.construct_dummy_table(current_version+1)
for statement in db_connector.to_sql(table):
db(statement)
if current_version > 0:
cols = ['id'] + ['v%i' % (i+1)
for i in range(current_version+1)]
db("""INSERT INTO dummy_table (%s)
SELECT %s, '' FROM dummy_table_old
""" % (', '.join(cols), ', '.join(cols[:-1])))
db("DROP TABLE dummy_table_old")
current_version += 1
if current_version != old_version:
self.update_version(db, current_version)
def construct_dummy_table(self, n_custom_fields=1):
fields = [Column('id')] + [
Column('v%d' % (i+1)) for i in range(n_custom_fields)
]
return Table('dummy_table')[fields]
def get_version(self, db):
rows = db("SELECT value FROM system WHERE name = %s",
(self.__class__.__name__,))
return int(rows[0][0]) if rows else 0
def update_version(self, db, version):
old_version = self.get_version(db)
if old_version:
db("UPDATE system SET value=%s WHERE name=%s",
(version, self.__class__.__name__,))
else:
db("INSERT INTO system (name, value) VALUES ('%s','%s')"
% (self.__class__.__name__, version))
return version
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(EnvironmentUpgradeTestCase, 'test'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')