blob: b2148af3092135bb2e6082f6c7c39b1bfba1a665 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Tests for Apache(TM) Bloodhound's tickets model in product environments"""
from datetime import datetime
import shutil
import unittest
from trac.ticket.model import Milestone, Ticket
from trac.ticket.tests.model import TicketTestCase, TicketCommentTestCase, \
TicketCommentEditTestCase, TicketCommentDeleteTestCase, EnumTestCase, \
MilestoneTestCase, ComponentTestCase, VersionTestCase
from trac.util.datefmt import to_utimestamp, utc
from multiproduct.model import Product
from multiproduct.env import ProductEnvironment
from tests.env import MultiproductTestCase
try:
import threading
except ImportError:
threading = None
from Queue import Queue
class ProductTicketTestCase(TicketTestCase, MultiproductTestCase):
def setUp(self):
self._mp_setup()
self.global_env = self.env
self.env = ProductEnvironment(self.global_env, self.default_product)
self._load_default_data(self.env)
self.env.config.set('ticket-custom', 'foo', 'text')
self.env.config.set('ticket-custom', 'cbon', 'checkbox')
self.env.config.set('ticket-custom', 'cboff', 'checkbox')
def tearDown(self):
self.global_env.reset_db()
self.env = self.global_env = None
def _get_ticket_uid(self, tid):
with self.env.db_query as db:
rows = db("""SELECT uid FROM ticket WHERE id=%s""", (tid, ))
return rows[0][0] if rows else -1
def test_insert_into_multiple_products(self):
# UIDs are global, autoincremented
# IDs are product-scoped, incremented in the SQL translator
self.env = ProductEnvironment(self.global_env, self.default_product)
tid = self._insert_ticket('hello kitty', reporter='admin')
ticket = Ticket(self.env, tid)
self.assertEqual(tid, 1)
self.assertEqual(self._get_ticket_uid(tid), 1)
self.assertEqual(ticket.id, tid)
tid = self._insert_ticket('hello kitteh', reporter='admin')
ticket = Ticket(self.env, tid)
self.assertEqual(tid, 2)
self.assertEqual(self._get_ticket_uid(tid), 2)
self.assertEqual(ticket.id, tid)
p2 = Product(self.global_env)
p2.prefix = 'p2'
p2.name = 'product, too'
p2.owner = 'admin'
p2.insert()
self.env = ProductEnvironment(self.global_env, p2)
tid = self._insert_ticket('hello catty', reporter='admin')
ticket = Ticket(self.env, tid)
self.assertEqual(tid, 1)
self.assertEqual(self._get_ticket_uid(tid), 3)
self.assertEqual(ticket.id, tid)
tid = self._insert_ticket('hello ocelot', reporter='admin')
ticket = Ticket(self.env, tid)
self.assertEqual(tid, 2)
self.assertEqual(self._get_ticket_uid(tid), 4)
self.assertEqual(ticket.id, tid)
class ProductTicketCommentTestCase(MultiproductTestCase):
@property
def env(self):
env = getattr(self, '_env', None)
if env is None:
self.global_env = self._setup_test_env()
self._upgrade_mp(self.global_env)
self._setup_test_log(self.global_env)
self._load_product_from_data(self.global_env, self.default_product)
self._env = env = ProductEnvironment(
self.global_env, self.default_product)
self._load_default_data(env)
return env
@env.setter
def env(self, value):
pass
def tearDown(self):
self.global_env.reset_db()
self._env = self.global_env = None
class ProductTicketCommentEditTestCase(TicketCommentEditTestCase,
ProductTicketCommentTestCase):
pass
class ProductTicketCommentDeleteTestCase(TicketCommentDeleteTestCase,
ProductTicketCommentTestCase):
pass
class ProductEnumTestCase(EnumTestCase, MultiproductTestCase):
def setUp(self):
self._mp_setup()
self.global_env = self.env
self.env = ProductEnvironment(self.global_env, self.default_product)
self._load_default_data(self.env)
def tearDown(self):
self.global_env.reset_db()
self.env = self.global_env = None
class ProductMilestoneTestCase(MilestoneTestCase, MultiproductTestCase):
def setUp(self):
self.global_env = self._setup_test_env(create_folder=True)
self._upgrade_mp(self.global_env)
self._setup_test_log(self.global_env)
self._load_product_from_data(self.global_env, self.default_product)
self.env = ProductEnvironment(self.global_env, self.default_product)
self._load_default_data(self.env)
def tearDown(self):
shutil.rmtree(self.global_env.path)
self.global_env.reset_db()
self.env = self.global_env = None
@unittest.skipUnless(threading, 'Threading required for test')
def test_milestone_threads(self):
""" Ensure that in threaded (e.g. mod_wsgi) situations, we get
an accurate list of milestones from Milestone.list
The basic strategy is:
thread-1 requests a list of milestones
thread-2 adds a milestone
thread-1 requests a new list of milestones
To pass, thread-1 should have a list of milestones that matches
those that are in the database.
"""
lock = threading.RLock()
results = []
# two events to coordinate the workers and ensure that the threads
# alternate appropriately
e1 = threading.Event()
e2 = threading.Event()
def task(add):
"""the thread task - either we are discovering or adding events"""
with lock:
env = ProductEnvironment(self.global_env,
self.default_product)
if add:
name = 'milestone_from_' + threading.current_thread().name
milestone = Milestone(env)
milestone.name = name
milestone.insert()
else:
# collect the names of milestones reported by Milestone and
# directly from the db - as sets to ease comparison later
results.append({
'from_t': set([m.name for m in Milestone.select(env)]),
'from_db': set(
[v[0] for v in self.env.db_query(
"SELECT name FROM milestone")])})
def worker1():
""" check milestones in this thread twice either side of ceding
control to worker2
"""
task(False)
e1.set()
e2.wait()
task(False)
def worker2():
""" adds a milestone when worker1 allows us to then cede control
back to worker1
"""
e1.wait()
task(True)
e2.set()
t1, t2 = [threading.Thread(target=f) for f in (worker1, worker2)]
t1.start()
t2.start()
t1.join()
t2.join()
r = results[-1] # note we only care about the final result
self.assertEqual(r['from_t'], r['from_db'])
def test_update_milestone(self):
self.env.db_transaction("INSERT INTO milestone (name) VALUES ('Test')")
milestone = Milestone(self.env, 'Test')
t1 = datetime(2001, 01, 01, tzinfo=utc)
t2 = datetime(2002, 02, 02, tzinfo=utc)
milestone.due = t1
milestone.completed = t2
milestone.description = 'Foo bar'
milestone.update()
self.assertEqual(
[('Test', to_utimestamp(t1), to_utimestamp(t2), 'Foo bar',
self.default_product)],
self.env.db_query("SELECT * FROM milestone WHERE name='Test'"))
class ProductComponentTestCase(ComponentTestCase, MultiproductTestCase):
def setUp(self):
self._mp_setup()
self.global_env = self.env
self.env = ProductEnvironment(self.global_env, self.default_product)
self._load_default_data(self.env)
def tearDown(self):
self.global_env.reset_db()
self.env = self.global_env = None
class ProductVersionTestCase(VersionTestCase, MultiproductTestCase):
def setUp(self):
self._mp_setup()
self.global_env = self.env
self.env = ProductEnvironment(self.global_env, self.default_product)
self._load_default_data(self.env)
def tearDown(self):
self.global_env.reset_db()
self.env = self.global_env = None
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ProductTicketTestCase, 'test'))
suite.addTest(unittest.makeSuite(ProductTicketCommentEditTestCase, 'test'))
suite.addTest(unittest.makeSuite(ProductTicketCommentDeleteTestCase, 'test'))
suite.addTest(unittest.makeSuite(ProductEnumTestCase, 'test'))
suite.addTest(unittest.makeSuite(ProductMilestoneTestCase, 'test'))
suite.addTest(unittest.makeSuite(ProductComponentTestCase, 'test'))
suite.addTest(unittest.makeSuite(ProductVersionTestCase, 'test'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')