blob: 1e1c273456ab9354b525cc2fad78b171e84d1f73 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from datetime import timedelta
import collections
from pylons import tmpl_context as c, app_globals as g
from nose.tools import assert_equal, assert_in
from ming.orm import ThreadLocalORMSession
import mock
import bson
from alluratest.controller import setup_basic_test, setup_global_objects, REGISTRY
from allura import model as M
from allura.model.notification import MailFooter
from allura.lib import helpers as h
from allura.tests import decorators as td
from forgewiki import model as WM
class TestNotification(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@td.with_wiki
def setup_with_tools(self):
setup_global_objects()
_clear_subscriptions()
_clear_notifications()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
M.notification.MAILBOX_QUIESCENT=None # disable message combining
def test_subscribe_unsubscribe(self):
M.Mailbox.subscribe(type='direct')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
subscriptions = M.Mailbox.query.find(dict(
project_id=c.project._id,
app_config_id=c.app.config._id,
user_id=c.user._id)).all()
assert len(subscriptions) == 1
assert subscriptions[0].type=='direct'
assert M.Mailbox.query.find().count() == 1
M.Mailbox.unsubscribe()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
subscriptions = M.Mailbox.query.find(dict(
project_id=c.project._id,
app_config_id=c.app.config._id,
user_id=c.user._id)).all()
assert len(subscriptions) == 0
assert M.Mailbox.query.find().count() == 0
@mock.patch('allura.tasks.mail_tasks.sendmail')
def test_send_direct(self, sendmail):
c.user = M.User.query.get(username='test-user')
wiki = c.project.app_instance('wiki')
page = WM.Page.query.get(app_config_id=wiki.config._id)
notification = M.Notification(
_id='_id',
ref=page.ref,
from_address='from_address',
reply_to_address='reply_to_address',
in_reply_to='in_reply_to',
subject='subject',
text='text',
)
notification.footer = lambda: ' footer'
notification.send_direct(c.user._id)
sendmail.post.assert_called_once_with(
destinations=[str(c.user._id)],
fromaddr='from_address',
reply_to='reply_to_address',
subject='subject',
message_id='_id',
in_reply_to='in_reply_to',
text='text footer',
)
@mock.patch('allura.tasks.mail_tasks.sendmail')
def test_send_direct_no_access(self, sendmail):
c.user = M.User.query.get(username='test-user')
wiki = c.project.app_instance('wiki')
page = WM.Page.query.get(app_config_id=wiki.config._id)
page.parent_security_context().acl = []
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
notification = M.Notification(
_id='_id',
ref=page.ref,
from_address='from_address',
reply_to_address='reply_to_address',
in_reply_to='in_reply_to',
subject='subject',
text='text',
)
notification.footer = lambda: ' footer'
notification.send_direct(c.user._id)
assert_equal(sendmail.post.call_count, 0)
@mock.patch('allura.tasks.mail_tasks.sendmail')
def test_send_direct_wrong_project_context(self, sendmail):
"""
Test that Notification.send_direct() works as expected even
if c.project is wrong.
This can happen when a notify task is triggered on project A (thus
setting c.project to A) and then calls Mailbox.fire_ready() which fires
pending Notifications on any waiting Mailbox, regardless of project,
but doesn't update c.project.
"""
project1 = c.project
project2 = M.Project.query.get(shortname='test2')
assert_equal(project1.shortname, 'test')
c.user = M.User.query.get(username='test-user')
wiki = project1.app_instance('wiki')
page = WM.Page.query.get(app_config_id=wiki.config._id)
notification = M.Notification(
_id='_id',
ref=page.ref,
from_address='from_address',
reply_to_address='reply_to_address',
in_reply_to='in_reply_to',
subject='subject',
text='text',
)
notification.footer = lambda: ' footer'
c.project = project2
notification.send_direct(c.user._id)
sendmail.post.assert_called_once_with(
destinations=[str(c.user._id)],
fromaddr='from_address',
reply_to='reply_to_address',
subject='subject',
message_id='_id',
in_reply_to='in_reply_to',
text='text footer',
)
class TestPostNotifications(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@td.with_wiki
def setup_with_tools(self):
setup_global_objects()
g.set_app('wiki')
_clear_subscriptions()
_clear_notifications()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
self.pg = WM.Page.query.get(app_config_id=c.app.config._id)
M.notification.MAILBOX_QUIESCENT=None # disable message combining
while M.MonQTask.run_ready('setup'):
ThreadLocalORMSession.flush_all()
def test_post_notification(self):
self._post_notification()
ThreadLocalORMSession.flush_all()
M.MonQTask.list()
t = M.MonQTask.get()
assert t.args[1] == self.pg.index_id()
def test_post_user_notification(self):
u = M.User.query.get(username='test-admin')
n = M.Notification.post_user(u, self.pg, 'metadata')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
flash_msgs = list(h.pop_user_notifications(u))
assert len(flash_msgs) == 1, flash_msgs
msg = flash_msgs[0]
assert msg['text'].startswith('Home modified by Test Admin')
assert msg['subject'].startswith('[test:wiki]')
flash_msgs = list(h.pop_user_notifications(u))
assert not flash_msgs, flash_msgs
def test_delivery(self):
self._subscribe()
self._post_notification()
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
assert M.Mailbox.query.find().count()==1
mbox = M.Mailbox.query.get()
assert len(mbox.queue) == 1
assert not mbox.queue_empty
def test_email(self):
self._subscribe() # as current user: test-admin
user2 = M.User.query.get(username='test-user-2')
self._subscribe(user=user2)
self._post_notification()
ThreadLocalORMSession.flush_all()
assert_equal(M.Notification.query.get()['from_address'], '"Test Admin" <test-admin@users.localhost>')
assert_equal(M.Mailbox.query.find().count(), 2)
M.MonQTask.run_ready() # sends the notification out into "mailboxes", and from mailboxes into email tasks
mboxes = M.Mailbox.query.find().all()
assert_equal(len(mboxes), 2)
assert_equal(len(mboxes[0].queue), 1)
assert not mboxes[0].queue_empty
assert_equal(len(mboxes[1].queue), 1)
assert not mboxes[1].queue_empty
email_tasks = M.MonQTask.query.find({'state': 'ready'}).all()
assert_equal(len(email_tasks), 2) # make sure both subscribers will get an email
first_destinations = [e.kwargs['destinations'][0] for e in email_tasks]
assert_in(str(c.user._id), first_destinations)
assert_in(str(user2._id), first_destinations)
assert_equal(email_tasks[0].kwargs['fromaddr'], '"Test Admin" <test-admin@users.localhost>')
assert_equal(email_tasks[1].kwargs['fromaddr'], '"Test Admin" <test-admin@users.localhost>')
assert email_tasks[0].kwargs['text'].startswith('Home modified by Test Admin')
assert 'you indicated interest in ' in email_tasks[0].kwargs['text']
def test_permissions(self):
# Notification should only be delivered if user has read perms on the
# artifact. The perm check happens just before the mail task is
# posted.
u = M.User.query.get(username='test-admin')
self._subscribe(user=u)
# Simulate a permission check failure.
def patched_has_access(*args, **kw):
def predicate(*args, **kw):
return False
return predicate
from allura.model.notification import security
orig = security.has_access
security.has_access = patched_has_access
try:
# this will create a notification task
self._post_notification()
ThreadLocalORMSession.flush_all()
# running the notification task will create a mail task if the
# permission check passes...
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
# ...but in this case it doesn't create a mail task since we
# forced the perm check to fail
assert M.MonQTask.get() == None
finally:
security.has_access = orig
def test_footer(self):
footer = MailFooter.monitored(
'test@mail.com',
'http://test1.com',
'http://test2.com')
assert 'test@mail.com is subscribed to http://test1.com' in footer
assert 'admin can change settings at http://test2.com' in footer
footer = MailFooter.standard(M.Notification())
assert 'Sent from sourceforge.net because you indicated interest in' in footer
def _subscribe(self, **kw):
self.pg.subscribe(type='direct', **kw)
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def _post_notification(self):
return M.Notification.post(self.pg, 'metadata')
class TestSubscriptionTypes(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@td.with_wiki
def setup_with_tools(self):
setup_global_objects()
g.set_app('wiki')
_clear_subscriptions()
_clear_notifications()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
self.pg = WM.Page.query.get(app_config_id=c.app.config._id)
M.notification.MAILBOX_QUIESCENT=None # disable message combining
def test_direct_sub(self):
self._subscribe()
self._post_notification(text='A')
self._post_notification(text='B')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
M.Mailbox.fire_ready()
def test_digest_sub(self):
self._subscribe(type='digest')
self._post_notification(text='x'*1024)
self._post_notification()
M.Mailbox.fire_ready()
def test_summary_sub(self):
self._subscribe(type='summary')
self._post_notification(text='x'*1024)
self._post_notification()
M.Mailbox.fire_ready()
def test_message(self):
self._test_message()
self.setUp()
self._test_message()
self.setUp()
M.notification.MAILBOX_QUIESCENT=timedelta(minutes=1)
# will raise "assert msg is not None" since the new message is not 1 min old:
self.assertRaises(AssertionError, self._test_message)
def _test_message(self):
self._subscribe()
thd = M.Thread.query.get(ref_id=self.pg.index_id())
thd.post('This is a very cool message')
M.MonQTask.run_ready()
ThreadLocalORMSession.flush_all()
M.Mailbox.fire_ready()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
msg = M.MonQTask.query.get(
task_name='allura.tasks.mail_tasks.sendmail',
state='ready')
assert msg is not None
assert 'Home@wiki.test.p' in msg.kwargs['reply_to']
u = M.User.by_username('test-admin')
assert str(u._id) in msg.kwargs['fromaddr'], msg.kwargs['fromaddr']
def _clear_subscriptions(self):
M.Mailbox.query.remove({})
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def _subscribe(self, type='direct', topic=None):
self.pg.subscribe(type=type, topic=topic)
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def _post_notification(self, text=None):
return M.Notification.post(self.pg, 'metadata', text=text)
@mock.patch('allura.model.notification.defaultdict')
@mock.patch('allura.model.notification.Notification')
def test_direct_accumulation(self, mocked_notification, mocked_defaultdict):
class OrderedDefaultDict(collections.OrderedDict):
def __init__(self, factory=list, *a, **kw):
self._factory = factory
super(OrderedDefaultDict, self).__init__(*a, **kw)
def __getitem__(self, key):
if key not in self:
value = self[key] = self._factory()
else:
value = super(OrderedDefaultDict, self).__getitem__(key)
return value
notifications = mocked_notification.query.find.return_value.all.return_value = [
mock.Mock(_id='n0', topic='metadata', subject='s1', from_address='f1', reply_to_address='rt1', author_id='a1'),
mock.Mock(_id='n1', topic='metadata', subject='s2', from_address='f2', reply_to_address='rt2', author_id='a2'),
mock.Mock(_id='n2', topic='metadata', subject='s2', from_address='f2', reply_to_address='rt2', author_id='a2'),
mock.Mock(_id='n3', topic='message', subject='s3', from_address='f3', reply_to_address='rt3', author_id='a3'),
mock.Mock(_id='n4', topic='message', subject='s3', from_address='f3', reply_to_address='rt3', author_id='a3'),
]
mocked_defaultdict.side_effect = OrderedDefaultDict
u0 = bson.ObjectId()
mbox = M.Mailbox(type='direct', user_id=u0, queue=['n0', 'n1', 'n2', 'n3', 'n4'])
mbox.fire('now')
mocked_notification.query.find.assert_called_once_with({'_id': {'$in': ['n0', 'n1', 'n2', 'n3', 'n4']}})
# first notification should be sent direct, as its key values are unique
notifications[0].send_direct.assert_called_once_with(u0)
# next two notifications should be sent as a digest as they have matching key values
mocked_notification.send_digest.assert_called_once_with(u0, 'f2', 's2', [notifications[1], notifications[2]], 'rt2')
# final two should be sent direct even though they matching keys, as they are messages
notifications[3].send_direct.assert_called_once_with(u0)
notifications[4].send_direct.assert_called_once_with(u0)
def test_send_direct_disabled_user(self):
user = M.User.by_username('test-admin')
thd = M.Thread.query.get(ref_id=self.pg.index_id())
notification = M.Notification()
notification.ref_id = thd.index_id()
user.disabled = True
ThreadLocalORMSession.flush_all()
notification.send_direct(user._id)
count = M.MonQTask.query.find(dict(
task_name='allura.tasks.mail_tasks.sendmail',
state='ready')).count()
assert_equal(count, 0)
user.disabled = False
ThreadLocalORMSession.flush_all()
notification.send_direct(user._id)
count = M.MonQTask.query.find(dict(
task_name='allura.tasks.mail_tasks.sendmail',
state='ready')).count()
assert_equal(count, 1)
@mock.patch('allura.model.notification.Notification.ref')
def test_send_digest_disabled_user(self, ref):
thd = M.Thread.query.get(ref_id=self.pg.index_id())
notification = M.Notification()
notification.ref_id = thd.index_id()
ref.artifact = thd
user = M.User.by_username('test-admin')
user.disabled = True
ThreadLocalORMSession.flush_all()
M.Notification.send_digest(user._id, 'test@mail.com', 'subject', [notification])
count = M.MonQTask.query.find(dict(
task_name='allura.tasks.mail_tasks.sendmail',
state='ready')).count()
assert_equal(count, 0)
user.disabled = False
ThreadLocalORMSession.flush_all()
M.Notification.send_digest(user._id, 'test@mail.com', 'subject', [notification])
count = M.MonQTask.query.find(dict(
task_name='allura.tasks.mail_tasks.sendmail',
state='ready')).count()
assert_equal(count, 1)
def _clear_subscriptions():
M.Mailbox.query.remove({})
def _clear_notifications():
M.Notification.query.remove({})