| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import calendar |
| from datetime import datetime, time, timedelta |
| import json |
| from urlparse import urlparse, parse_qs |
| from urllib import urlencode |
| from bson import ObjectId |
| |
| import re |
| from ming.orm.ormsession import ThreadLocalORMSession, session |
| from tg import config, expose |
| from mock import patch |
| import mock |
| from nose.tools import ( |
| assert_equal, |
| assert_not_equal, |
| assert_is_none, |
| assert_is_not_none, |
| assert_in, |
| assert_not_in, |
| assert_true, |
| assert_false, |
| ) |
| from pylons import tmpl_context as c |
| from webob import exc |
| |
| from allura.tests import TestController |
| from allura.tests import decorators as td |
| from alluratest.controller import setup_trove_categories |
| from allura import model as M |
| from allura.lib import plugin |
| from allura.lib import helpers as h |
| |
| |
| def unentity(s): |
| return s.replace('"', '"') |
| |
| |
| class TestAuth(TestController): |
| def test_login(self): |
| self.app.get('/auth/') |
| r = self.app.post('/auth/send_verification_link', params=dict(a='test@example.com')) |
| email = M.User.query.get(username='test-admin').email_addresses[0] |
| r = self.app.post('/auth/send_verification_link', params=dict(a=email)) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.get('/auth/verify_addr', params=dict(a='foo')) |
| assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r) |
| ea = M.EmailAddress.query.find().first() |
| r = self.app.get('/auth/verify_addr', params=dict(a=ea.nonce)) |
| assert json.loads(self.webflash(r))['status'] == 'ok', self.webflash(r) |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo')) |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='food')) |
| assert 'Invalid login' in str(r), r.showbrowser() |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-usera', password='foo')) |
| assert 'Invalid login' in str(r), r.showbrowser() |
| |
| def test_track_login(self): |
| user = M.User.by_username('test-user') |
| assert_equal(user.last_access['login_date'], None) |
| assert_equal(user.last_access['login_ip'], None) |
| assert_equal(user.last_access['login_ua'], None) |
| |
| self.app.post('/auth/do_login', |
| headers={'User-Agent': 'browser'}, |
| extra_environ={'REMOTE_ADDR': 'addr'}, |
| params=dict( |
| username='test-user', |
| password='foo' |
| )) |
| user = M.User.by_username('test-user') |
| assert_not_equal(user.last_access['login_date'], None) |
| assert_equal(user.last_access['login_ip'], 'addr') |
| assert_equal(user.last_access['login_ua'], 'browser') |
| |
| def test_rememberme(self): |
| username = M.User.query.get(username='test-user').username |
| |
| # Login as test-user with remember me checkbox off |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo')) |
| assert_equal(r.session['username'], username) |
| assert_equal(r.session['login_expires'], True) |
| |
| for header, contents in r.headerlist: |
| if header == 'Set-cookie': |
| assert_not_in('expires', contents) |
| |
| # Login as test-user with remember me checkbox on |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', rememberme='on')) |
| assert_equal(r.session['username'], username) |
| assert_not_equal(r.session['login_expires'], True) |
| |
| for header, contents in r.headerlist: |
| if header == 'Set-cookie': |
| assert_in('expires', contents) |
| |
| @td.with_user_project('test-admin') |
| def test_user_can_not_claim_duplicate_emails(self): |
| email_address = 'test_abcd_123@domain.net' |
| user = M.User.query.get(username='test-admin') |
| addresses_number = len(user.email_addresses) |
| self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert M.EmailAddress.query.find(dict(email=email_address, claimed_by_user_id=user._id)).count() == 1 |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r) |
| assert M.EmailAddress.query.find(dict(email=email_address, claimed_by_user_id=user._id)).count() == 1 |
| assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_added_claimed_address_by_other_user_confirmed(self, gen_message_id, sendsimplemail): |
| email_address = 'test_abcd_123@domain.net' |
| |
| # test-user claimed & confirmed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.query.find(dict(email=email_address)).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| |
| # Claiming the same email address by test-admin |
| # the email should be added to the email_addresses list but notifications should not be sent |
| |
| admin = M.User.query.get(username='test-admin') |
| addresses_number = len(admin.email_addresses) |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'A verification email has been sent. ' \ |
| 'Please check your email and click to confirm.' |
| |
| args, kwargs = sendsimplemail.post.call_args |
| |
| assert sendsimplemail.post.call_count == 1 |
| assert kwargs['toaddr'] == email_address |
| assert kwargs['subject'] == u'%s - Email address claim attempt' % config['site_name'] |
| assert "You tried to add %s to your Allura account, " \ |
| "but it is already claimed by your %s account." % (email_address, user.username) in kwargs['text'] |
| |
| assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1 |
| assert len(M.EmailAddress.query.find(dict(email=email_address)).all()) == 2 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_added_claimed_address_by_other_user_not_confirmed(self, gen_message_id, sendsimplemail): |
| email_address = 'test_abcd_1235@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.query.find(dict(email=email_address)).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| # Claiming the same email address by test-admin |
| # the email should be added to the email_addresses list but notifications should not be sent |
| |
| user1 = M.User.query.get(username='test-user-1') |
| addresses_number = len(user1.email_addresses) |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| }, |
| extra_environ=dict(username='test-user-1')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'A verification email has been sent. ' \ |
| 'Please check your email and click to confirm.' |
| assert sendsimplemail.post.called |
| assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1 |
| assert len(M.EmailAddress.query.find(dict(email=email_address)).all()) == 2 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_cannot_claim_more_than_max_limit(self, gen_message_id, sendsimplemail): |
| with h.push_config(config, **{'user_prefs.maximum_claimed_emails': '1'}): |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': 'test_abcd_1@domain.net', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| }, |
| extra_environ=dict(username='test-user-1')) |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': 'test_abcd_2@domain.net', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| }, |
| extra_environ=dict(username='test-user-1')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error' |
| assert json.loads(self.webflash(r))['message'] == 'You cannot claim more than 1 email addresses.' |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_verification_link_for_confirmed_email(self, gen_message_id, sendsimplemail): |
| email_address = 'test_abcd@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.query.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| email.confirmed = True |
| |
| user1 = M.User.query.get(username='test-user-1') |
| user1.claim_address(email_address) |
| email = M.EmailAddress.query.find(dict(email=email_address, claimed_by_user_id=user1._id)).first() |
| email.confirmed = False |
| |
| ThreadLocalORMSession.flush_all() |
| |
| r = self.app.post('/auth/send_verification_link', |
| params=dict(a=email_address), |
| extra_environ=dict(username='test-user-1')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'Verification link sent' |
| |
| args, kwargs = sendsimplemail.post.call_args |
| assert sendsimplemail.post.call_count == 1 |
| assert kwargs['toaddr'] == email_address |
| assert kwargs['subject'] == u'%s - Email address claim attempt' % config['site_name'] |
| assert "You tried to add %s to your Allura account, " \ |
| "but it is already claimed by your %s account." % (email_address, user.username) in kwargs['text'] |
| |
| def test_invalidate_verification_link_if_email_was_confirmed(self): |
| email_address = 'test_abcd@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.query.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| |
| self.app.post('/auth/send_verification_link', |
| params=dict(a=email_address), |
| extra_environ=dict(username='test-user')) |
| |
| |
| user1 = M.User.query.get(username='test-user-1') |
| user1.claim_address(email_address) |
| email1 = M.EmailAddress.query.find(dict(email=email_address, claimed_by_user_id=user1._id)).first() |
| email1.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| # Verify first email with the verification link |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), extra_environ=dict(username='test-user')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error' |
| email = M.EmailAddress.query.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| assert not email.confirmed |
| |
| |
| @td.with_user_project('test-admin') |
| def test_prefs(self): |
| r = self.app.get('/auth/preferences/', |
| extra_environ=dict(username='test-admin')) |
| # check preconditions of test data |
| assert 'test@example.com' not in r |
| assert 'test-admin@users.localhost' in r |
| assert_equal(M.User.query.get(username='test-admin').get_pref('email_address'), |
| 'test-admin@users.localhost') |
| |
| # add test@example |
| with td.audits('New email address: test@example.com', user=True): |
| r = self.app.post('/auth/preferences/update_emails', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'new_addr.addr': 'test@example.com', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain' |
| }) |
| r = self.app.get('/auth/preferences/') |
| assert 'test@example.com' in r |
| user = M.User.query.get(username='test-admin') |
| assert_equal(user.get_pref('email_address'), 'test-admin@users.localhost') |
| |
| # remove test-admin@users.localhost |
| with td.audits('Email address deleted: test-admin@users.localhost', user=True): |
| r = self.app.post('/auth/preferences/update_emails', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'addr-1.ord': '1', |
| 'addr-1.delete': 'on', |
| 'addr-2.ord': '2', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain' |
| }) |
| r = self.app.get('/auth/preferences/') |
| assert 'test-admin@users.localhost' not in r |
| # preferred address has not changed if email is not verified |
| user = M.User.query.get(username='test-admin') |
| assert_equal(user.get_pref('email_address'), None) |
| |
| with td.audits('Display Name changed Test Admin => Admin', user=True): |
| r = self.app.post('/auth/preferences/update', |
| params={'preferences.display_name': 'Admin'}, |
| extra_environ=dict(username='test-admin')) |
| |
| @td.with_user_project('test-admin') |
| def test_email_prefs_change_requires_password(self): |
| # Claim new email |
| new_email_params = { |
| 'new_addr.addr': 'test@example.com', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to claim new email', self.webflash(r)) |
| assert_not_in('test@example.com', r.follow()) |
| new_email_params['password'] = 'bad pass' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to claim new email', self.webflash(r)) |
| assert_not_in('test@example.com', r.follow()) |
| new_email_params['password'] = 'foo' # valid password |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_not_in('You must provide your current password to claim new email', self.webflash(r)) |
| assert_in('test@example.com', r.follow()) |
| |
| # Change primary address |
| change_primary_params = { |
| 'new_addr.addr': '', |
| 'primary_addr': 'test@example.com', |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to change primary address', self.webflash(r)) |
| assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test-admin@users.localhost') |
| change_primary_params['password'] = 'bad pass' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to change primary address', self.webflash(r)) |
| assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test-admin@users.localhost') |
| change_primary_params['password'] = 'foo' # valid password |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert_not_in('You must provide your current password to change primary address', self.webflash(r)) |
| assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test@example.com') |
| |
| # Remove email |
| remove_email_params = { |
| 'addr-1.ord': '1', |
| 'addr-2.ord': '2', |
| 'addr-2.delete': 'on', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost', |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to delete an email', self.webflash(r)) |
| assert_in('test@example.com', r.follow()) |
| remove_email_params['password'] = 'bad pass' |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to delete an email', self.webflash(r)) |
| assert_in('test@example.com', r.follow()) |
| remove_email_params['password'] = 'foo' # vallid password |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_not_in('You must provide your current password to delete an email', self.webflash(r)) |
| assert_not_in('test@example.com', r.follow()) |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions(self): |
| r = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| subscriptions = M.Mailbox.query.find(dict( |
| user_id=c.user._id, is_flash=False)).all() |
| # make sure page actually lists all the user's subscriptions |
| assert len( |
| subscriptions) > 0, 'Test user has no subscriptions, cannot verify that they are shown' |
| for m in subscriptions: |
| assert m._id in r, "Page doesn't list subscription for Mailbox._id = %s" % m._id |
| |
| # make sure page lists all tools which user can subscribe |
| user = M.User.query.get(username='test-admin') |
| tools = [] |
| for p in user.my_projects(): |
| for ac in p.app_configs: |
| if not M.Mailbox.subscribed(project_id=p._id, app_config_id=ac._id): |
| tools.append(ac._id) |
| for tool_id in tools: |
| assert tool_id in r, "Page doesn't list tool with app_config_id = %s" % tool_id |
| |
| def _find_subscriptions_form(self, r): |
| form = None |
| for f in r.forms.itervalues(): |
| if f.action == 'update_subscriptions': |
| form = f |
| break |
| assert form is not None, "Can't find subscriptions form" |
| return form |
| |
| def _find_subscriptions_field(self, form, subscribed=False): |
| field_name = None |
| for k, v in form.fields.iteritems(): |
| if subscribed: |
| check = c and v[0].value == 'on' |
| else: |
| check = c and v[0].value != 'on' |
| if k and k.endswith('.subscribed') and check: |
| field_name = k.replace('.subscribed', '') |
| assert field_name, "Can't find unsubscribed tool for user" |
| return field_name |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions_subscribe(self): |
| resp = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| form = self._find_subscriptions_form(resp) |
| # find not subscribed tool, subscribe and verify |
| field_name = self._find_subscriptions_field(form, subscribed=False) |
| t_id = ObjectId(form.fields[field_name + '.tool_id'][0].value) |
| p_id = ObjectId(form.fields[field_name + '.project_id'][0].value) |
| subscribed = M.Mailbox.subscribed(project_id=p_id, app_config_id=t_id) |
| assert not subscribed, "User already subscribed for tool %s" % t_id |
| form.fields[field_name + '.subscribed'][0].value = 'on' |
| form.submit() |
| subscribed = M.Mailbox.subscribed(project_id=p_id, app_config_id=t_id) |
| assert subscribed, "User is not subscribed for tool %s" % t_id |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions_unsubscribe(self): |
| resp = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| form = self._find_subscriptions_form(resp) |
| field_name = self._find_subscriptions_field(form, subscribed=True) |
| s_id = ObjectId(form.fields[field_name + '.subscription_id'][0].value) |
| s = M.Mailbox.query.get(_id=s_id) |
| assert s, "User has not subscription with Mailbox._id = %s" % s_id |
| form.fields[field_name + '.subscribed'][0].value = None |
| form.submit() |
| s = M.Mailbox.query.get(_id=s_id) |
| assert not s, "User still has subscription with Mailbox._id %s" % s_id |
| |
| def test_format_email(self): |
| self.app.post('/auth/subscriptions/update_subscriptions', |
| params={'email_format': 'html', 'subscriptions': ''}) |
| r = self.app.get('/auth/subscriptions/') |
| assert '<option selected value="html">HTML</option>' in r |
| self.app.post('/auth/subscriptions/update_subscriptions', |
| params={'email_format': 'plain', 'subscriptions': ''}) |
| r = self.app.get('/auth/subscriptions/') |
| assert '<option selected value="plain">Plain Text</option>' in r |
| self.app.post('/auth/subscriptions/update_subscriptions', |
| params={'email_format': 'both', 'subscriptions': ''}) |
| r = self.app.get('/auth/subscriptions/') |
| assert '<option selected value="both">Combined</option>' in r |
| |
| def test_create_account(self): |
| r = self.app.get('/auth/create_account') |
| assert 'Create an Account' in r |
| r = self.app.post('/auth/save_new', |
| params=dict(username='aaa', pw='123')) |
| assert 'Enter a value 6 characters long or more' in r |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me')) |
| r = r.follow() |
| assert 'User "aaa" registered' in unentity(r.body) |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me')) |
| assert 'That username is already taken. Please choose another.' in r |
| r = self.app.get('/auth/logout') |
| r = self.app.post( |
| '/auth/do_login', |
| params=dict(username='aaa', password='12345678'), |
| status=302) |
| |
| def test_create_account_require_email(self): |
| with h.push_config(config, **{'auth.require_email_addr': 'false'}): |
| self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com')) |
| user = M.User.query.get(username='aaa') |
| assert not user.pending |
| with h.push_config(config, **{'auth.require_email_addr': 'true'}): |
| self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='bbb', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com')) |
| user = M.User.query.get(username='bbb') |
| assert user.pending |
| |
| def test_verify_email(self): |
| with h.push_config(config, **{'auth.require_email_addr': 'true'}): |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com')) |
| r = r.follow() |
| user = M.User.query.get(username='aaa') |
| em = M.EmailAddress.query.get(email='test@example.com') |
| assert user._id == em.claimed_by_user_id |
| r = self.app.get('/auth/verify_addr', params=dict(a=em.nonce)) |
| user = M.User.query.get(username='aaa') |
| em = M.EmailAddress.query.get(email='test@example.com') |
| assert not user.pending |
| assert em.confirmed |
| |
| def test_create_account_disabled_header_link(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| r = self.app.get('/') |
| assert 'Register' not in r |
| |
| def test_create_account_disabled_form_gone(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| r = self.app.get('/auth/create_account', status=404) |
| assert 'Create an Account' not in r |
| |
| def test_create_account_disabled_submit_fails(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| self.app.post('/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me'), |
| status=404) |
| |
| def test_one_project_role(self): |
| """Make sure when a user goes to a new project only one project role is created. |
| There was an issue with extra project roles getting created if a user went directly to |
| an admin page.""" |
| p_nbhd = M.Neighborhood.query.get(name='Projects') |
| p = M.Project.query.get(shortname='test', neighborhood_id=p_nbhd._id) |
| self.app.post('/auth/save_new', params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com')).follow() |
| user = M.User.query.get(username='aaa') |
| user.pending = False |
| session(user).flush(user) |
| assert M.ProjectRole.query.find( |
| dict(user_id=user._id, project_id=p._id)).count() == 0 |
| |
| self.app.get('/p/test/admin/permissions', |
| extra_environ=dict(username='aaa'), status=403) |
| assert M.ProjectRole.query.find( |
| dict(user_id=user._id, project_id=p._id)).count() <= 1 |
| |
| def test_default_lookup(self): |
| # Make sure that default _lookup() throws 404 |
| self.app.get('/auth/foobar', status=404) |
| |
| def test_disabled_user(self): |
| user = M.User.query.get(username='test-admin') |
| sess = session(user) |
| assert not user.disabled |
| r = self.app.get('/p/test/admin/', |
| extra_environ={'username': 'test-admin'}) |
| assert_equal(r.status_int, 200, 'Redirect to %s' % r.location) |
| user.disabled = True |
| sess.save(user) |
| sess.flush() |
| user = M.User.query.get(username='test-admin') |
| assert user.disabled |
| r = self.app.get('/p/test/admin/', |
| extra_environ={'username': 'test-admin'}) |
| assert_equal(r.status_int, 302) |
| assert_equal(r.location, 'http://localhost/auth/?return_to=%2Fp%2Ftest%2Fadmin%2F') |
| |
| def test_no_open_return_to(self): |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| return_to='/foo')) |
| assert_equal(r.location, 'http://localhost/foo') |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| return_to='https://localhost/foo')) |
| assert_equal(r.location, 'https://localhost/foo') |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| return_to='http://example.com/foo')) |
| assert_equal(r.location, 'http://localhost/') |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| return_to='//example.com/foo')) |
| assert_equal(r.location, 'http://localhost/') |
| |
| |
| class TestPreferences(TestController): |
| @td.with_user_project('test-admin') |
| def test_personal_data(self): |
| from pytz import country_names |
| |
| setsex, setbirthdate, setcountry, setcity, settimezone = \ |
| ('Male', '19/08/1988', 'IT', 'Milan', 'Europe/Rome') |
| self.app.get('/auth/user_info/') |
| |
| # Check if personal data is properly set |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict( |
| sex=setsex, |
| birthdate=setbirthdate, |
| country=setcountry, |
| city=setcity, |
| timezone=settimezone)) |
| user = M.User.query.get(username='test-admin') |
| sex = user.sex |
| assert sex == setsex |
| birthdate = user.birthdate.strftime('%d/%m/%Y') |
| assert birthdate == setbirthdate |
| country = user.localization.country |
| assert country_names.get(setcountry) == country |
| city = user.localization.city |
| assert city == setcity |
| timezone = user.timezone |
| assert timezone == settimezone |
| |
| # Check if setting a wrong date everything works correctly |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict(birthdate='30/02/1998')) |
| assert 'Please enter a valid date' in str(r) |
| user = M.User.query.get(username='test-admin') |
| sex = user.sex |
| assert sex == setsex |
| birthdate = user.birthdate.strftime('%d/%m/%Y') |
| assert birthdate == setbirthdate |
| country = user.localization.country |
| assert country_names.get(setcountry) == country |
| city = user.localization.city |
| assert city == setcity |
| timezone = user.timezone |
| assert timezone == settimezone |
| |
| # Check deleting birthdate |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict( |
| sex=setsex, |
| birthdate='', |
| country=setcountry, |
| city=setcity, |
| timezone=settimezone)) |
| user = M.User.query.get(username='test-admin') |
| assert user.birthdate is None |
| |
| @td.with_user_project('test-admin') |
| def test_contacts(self): |
| # Add skype account |
| testvalue = 'testaccount' |
| self.app.get('/auth/user_info/contacts/') |
| self.app.post('/auth/user_info/contacts/skype_account', |
| params=dict(skypeaccount=testvalue)) |
| user = M.User.query.get(username='test-admin') |
| assert user.skypeaccount == testvalue |
| |
| # Add social network account |
| socialnetwork = 'Facebook' |
| accounturl = 'http://www.facebook.com/test' |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(socialnetwork=socialnetwork, |
| accounturl=accounturl)) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 \ |
| and user.socialnetworks[0].socialnetwork == socialnetwork \ |
| and user.socialnetworks[0].accounturl == accounturl |
| |
| # Add second social network account |
| socialnetwork2 = 'Twitter' |
| accounturl2 = 'http://twitter.com/test' |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(socialnetwork=socialnetwork2, |
| accounturl='@test')) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 2 and \ |
| ({'socialnetwork': socialnetwork, 'accounturl': accounturl} in user.socialnetworks and |
| {'socialnetwork': socialnetwork2, 'accounturl': accounturl2} in user.socialnetworks) |
| |
| # Remove first social network account |
| self.app.post('/auth/user_info/contacts/remove_social_network', |
| params=dict(socialnetwork=socialnetwork, |
| account=accounturl)) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 and \ |
| {'socialnetwork': socialnetwork2, 'accounturl': accounturl2} in user.socialnetworks |
| |
| # Add empty social network account |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(accounturl=accounturl, socialnetwork='')) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 and \ |
| {'socialnetwork': socialnetwork2, 'accounturl': accounturl2} in user.socialnetworks |
| |
| # Add invalid social network account |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(accounturl=accounturl, socialnetwork='invalid')) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 and \ |
| {'socialnetwork': socialnetwork2, 'accounturl': accounturl2} in user.socialnetworks |
| |
| # Add telephone number |
| telnumber = '+3902123456' |
| self.app.post('/auth/user_info/contacts/add_telnumber', |
| params=dict(newnumber=telnumber)) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 1 and (user.telnumbers[0] == telnumber)) |
| |
| # Add second telephone number |
| telnumber2 = '+3902654321' |
| self.app.post('/auth/user_info/contacts/add_telnumber', |
| params=dict(newnumber=telnumber2)) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 2 and telnumber in user.telnumbers and telnumber2 in user.telnumbers) |
| |
| # Remove first telephone number |
| self.app.post('/auth/user_info/contacts/remove_telnumber', |
| params=dict(oldvalue=telnumber)) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 1 and telnumber2 in user.telnumbers) |
| |
| # Add website |
| website = 'http://www.testurl.com' |
| self.app.post('/auth/user_info/contacts/add_webpage', |
| params=dict(newwebsite=website)) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 1 and (website in user.webpages)) |
| |
| # Add second website |
| website2 = 'http://www.testurl2.com' |
| self.app.post('/auth/user_info/contacts/add_webpage', |
| params=dict(newwebsite=website2)) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 2 and website in user.webpages and website2 in user.webpages) |
| |
| # Remove first website |
| self.app.post('/auth/user_info/contacts/remove_webpage', |
| params=dict(oldvalue=website)) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 1 and website2 in user.webpages) |
| |
| @td.with_user_project('test-admin') |
| def test_availability(self): |
| # Add availability timeslot |
| weekday = 'Monday' |
| starttime = time(9, 0, 0) |
| endtime = time(12, 0, 0) |
| |
| self.app.get('/auth/user_info/availability/') |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday, |
| starttime=starttime.strftime('%H:%M'), |
| endtime=endtime.strftime('%H:%M'))) |
| user = M.User.query.get(username='test-admin') |
| timeslot1dict = dict( |
| week_day=weekday, start_time=starttime, end_time=endtime) |
| assert len( |
| user.availability) == 1 and timeslot1dict in user.get_availability_timeslots() |
| |
| weekday2 = 'Tuesday' |
| starttime2 = time(14, 0, 0) |
| endtime2 = time(16, 0, 0) |
| |
| # Add second availability timeslot |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday2, |
| starttime=starttime2.strftime('%H:%M'), |
| endtime=endtime2.strftime('%H:%M'))) |
| user = M.User.query.get(username='test-admin') |
| timeslot2dict = dict(week_day=weekday2, |
| start_time=starttime2, end_time=endtime2) |
| assert len(user.availability) == 2 \ |
| and timeslot1dict in user.get_availability_timeslots() \ |
| and timeslot2dict in user.get_availability_timeslots() |
| |
| # Remove availability timeslot |
| r = self.app.post('/auth/user_info/availability/remove_timeslot', |
| params=dict( |
| weekday=weekday, |
| starttime=starttime.strftime('%H:%M'), |
| endtime=endtime.strftime('%H:%M'))) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.availability) == 1 and timeslot2dict in user.get_availability_timeslots() |
| |
| # Add invalid availability timeslot |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday2, |
| starttime=endtime2.strftime('%H:%M'), |
| endtime=starttime2.strftime('%H:%M'))) |
| assert 'Invalid period:' in str(r) |
| user = M.User.query.get(username='test-admin') |
| timeslot2dict = dict(week_day=weekday2, |
| start_time=starttime2, end_time=endtime2) |
| assert len(user.availability) == 1 and timeslot2dict in user.get_availability_timeslots() |
| |
| @td.with_user_project('test-admin') |
| def test_inactivity(self): |
| # Add inactivity period |
| now = datetime.utcnow().date() |
| now = datetime(now.year, now.month, now.day) |
| startdate = now + timedelta(days=1) |
| enddate = now + timedelta(days=7) |
| self.app.get('/auth/user_info/availability/') |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate=startdate.strftime('%d/%m/%Y'), |
| enddate=enddate.strftime('%d/%m/%Y'))) |
| user = M.User.query.get(username='test-admin') |
| period1dict = dict(start_date=startdate, end_date=enddate) |
| assert len(user.inactiveperiod) == 1 and period1dict in user.get_inactive_periods() |
| |
| # Add second inactivity period |
| startdate2 = now + timedelta(days=24) |
| enddate2 = now + timedelta(days=28) |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate=startdate2.strftime('%d/%m/%Y'), |
| enddate=enddate2.strftime('%d/%m/%Y'))) |
| user = M.User.query.get(username='test-admin') |
| period2dict = dict(start_date=startdate2, end_date=enddate2) |
| assert len(user.inactiveperiod) == 2 \ |
| and period1dict in user.get_inactive_periods() \ |
| and period2dict in user.get_inactive_periods() |
| |
| # Remove first inactivity period |
| r = self.app.post( |
| '/auth/user_info/availability/remove_inactive_period', |
| params=dict( |
| startdate=startdate.strftime('%d/%m/%Y'), |
| enddate=enddate.strftime('%d/%m/%Y'))) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.inactiveperiod) == 1 and period2dict in user.get_inactive_periods() |
| |
| # Add invalid inactivity period |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate='NOT/A/DATE', |
| enddate=enddate2.strftime('%d/%m/%Y'))) |
| user = M.User.query.get(username='test-admin') |
| assert 'Please enter a valid date' in str(r) |
| assert len(user.inactiveperiod) == 1 and period2dict in user.get_inactive_periods() |
| |
| @td.with_user_project('test-admin') |
| def test_skills(self): |
| setup_trove_categories() |
| # Add a skill |
| skill_cat = M.TroveCategory.query.get(show_as_skill=True) |
| level = 'low' |
| comment = 'test comment' |
| result = self.app.get('/auth/user_info/skills/') |
| r = self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level, |
| comment=comment, |
| selected_skill=str(skill_cat.trove_cat_id))) |
| user = M.User.query.get(username='test-admin') |
| skilldict = dict(category_id=skill_cat._id, |
| comment=comment, level=level) |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Add again the same skill |
| level = 'medium' |
| comment = 'test comment 2' |
| result = self.app.get('/auth/user_info/skills/') |
| r = self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level, |
| comment=comment, |
| selected_skill=str(skill_cat.trove_cat_id))) |
| user = M.User.query.get(username='test-admin') |
| skilldict = dict(category_id=skill_cat._id, |
| comment=comment, level=level) |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Add an invalid skill |
| level2 = 'not a level' |
| comment2 = 'test comment 2' |
| r = self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level2, |
| comment=comment2, |
| selected_skill=str(skill_cat.trove_cat_id))) |
| user = M.User.query.get(username='test-admin') |
| # Check that everything is as it was before |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Remove a skill |
| self.app.get('/auth/user_info/skills/') |
| self.app.post('/auth/user_info/skills/remove_skill', |
| params=dict( |
| categoryid=str(skill_cat.trove_cat_id))) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.skills) == 0 |
| |
| @td.with_user_project('test-admin') |
| def test_user_message(self): |
| assert not M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| self.app.post('/auth/preferences/user_message') |
| assert M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| self.app.post('/auth/preferences/user_message', |
| params={'allow_user_messages': 'on'}) |
| assert not M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| |
| @td.with_user_project('test-admin') |
| def test_additional_page(self): |
| class MyPP(plugin.UserPreferencesProvider): |
| def not_page(self): |
| return 'not page' |
| |
| @expose() |
| def new_page(self): |
| return 'new page' |
| |
| with mock.patch.object(plugin.UserPreferencesProvider, 'get') as upp_get: |
| upp_get.return_value = MyPP() |
| r = self.app.get('/auth/new_page') |
| assert_equal(r.body, 'new page') |
| self.app.get('/auth/not_page', status=404) |
| |
| |
| class TestPasswordReset(TestController): |
| test_primary_email = 'testprimaryaddr@mail.com' |
| |
| @patch('allura.tasks.mail_tasks.sendmail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_email_unconfirmed(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.query.find( |
| {'claimed_by_user_id': user._id}).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| self.app.post('/auth/password_recovery_hash', {'email': email.email}) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is None |
| |
| @patch('allura.tasks.mail_tasks.sendmail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_disabled(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.query.find( |
| {'claimed_by_user_id': user._id}).first() |
| user.disabled = True |
| ThreadLocalORMSession.flush_all() |
| self.app.post('/auth/password_recovery_hash', {'email': email.email}) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is None |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_only_primary_email_reset_allowed(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| user.claim_address(self.test_primary_email) |
| user.set_pref('email_address', self.test_primary_email) |
| |
| email = M.EmailAddress.query.find({'email': self.test_primary_email}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| |
| with h.push_config(config, **{'auth.allow_non_primary_email_password_reset': 'false'}): |
| self.app.post('/auth/password_recovery_hash', {'email': self.test_primary_email}) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is not None |
| args, kwargs = sendmail.post.call_args |
| assert_equal(kwargs['toaddr'], self.test_primary_email) |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_non_primary_email_reset_allowed(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email1 = M.EmailAddress.query.find({'claimed_by_user_id': user._id}).first() |
| user.claim_address(self.test_primary_email) |
| user.set_pref('email_address', self.test_primary_email) |
| email = M.EmailAddress.query.find({'email': self.test_primary_email}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| with h.push_config(config, **{'auth.allow_non_primary_email_password_reset': 'true'}): |
| self.app.post('/auth/password_recovery_hash', {'email': email1.email}) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is not None |
| args, kwargs = sendmail.post.call_args |
| assert_equal(kwargs['toaddr'], email1.email) |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_password_reset(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.query.find( |
| {'claimed_by_user_id': user._id}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| old_pw_hash = user.password |
| with td.audits('Password recovery link sent to: test-admin@users.localhost', user=True): |
| r = self.app.post('/auth/password_recovery_hash', {'email': email.email}) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert hash is not None |
| assert hash_expiry is not None |
| |
| r = self.app.get('/auth/forgotten_password/%s' % hash) |
| assert_in('Enter a new password for: test-admin', r) |
| assert_in('New Password:', r) |
| assert_in('New Password (again):', r) |
| form = r.forms[0] |
| form['pw'] = form['pw2'] = new_password = '154321' |
| with td.audits('Password changed \(through recovery process\)', user=True): |
| # escape parentheses, so they would not be treated as regex group |
| r = form.submit() |
| user = M.User.query.get(username='test-admin') |
| assert_not_equal(old_pw_hash, user.password) |
| provider = plugin.LocalAuthenticationProvider(None) |
| assert_true(provider._validate_password(user, new_password)) |
| |
| text = '''Your username is test-admin |
| |
| To reset your password on %s, please visit the following URL: |
| |
| %s/auth/forgotten_password/%s''' % (config['site_name'], config['base_url'], hash) |
| |
| sendmail.post.assert_called_once_with( |
| toaddr=email.email, |
| fromaddr=config['forgemail.return_path'], |
| reply_to=config['forgemail.return_path'], |
| subject='Allura Password recovery', |
| message_id=gen_message_id(), |
| text=text) |
| user = M.User.query.get(username='test-admin') |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert_equal(hash, '') |
| assert_equal(hash_expiry, '') |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_hash_expired(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.query.find( |
| {'claimed_by_user_id': user._id}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/auth/password_recovery_hash', {'email': email.email}) |
| user = M.User.by_username('test-admin') |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| user.set_tool_data('AuthPasswordReset', |
| hash_expiry=datetime(2000, 10, 10)) |
| r = self.app.get('/auth/forgotten_password/%s' % hash.encode('utf-8')) |
| assert_in('Unable to process reset, please try again', r.follow().body) |
| r = self.app.post('/auth/set_new_password/%s' % |
| hash.encode('utf-8'), {'pw': '154321', 'pw2': '154321'}) |
| assert_in('Unable to process reset, please try again', r.follow().body) |
| |
| @patch('allura.lib.plugin.AuthenticationProvider') |
| def test_provider_disabled(self, AP): |
| user = M.User.query.get(username='test-admin') |
| ap = AP.get() |
| ap.forgotten_password_process = False |
| ap.authenticate_request()._id = user._id |
| ap.by_username().username = user.username |
| self.app.get('/auth/forgotten_password', status=404) |
| self.app.post('/auth/set_new_password', |
| {'pw': 'foo', 'pw2': 'foo'}, status=404) |
| self.app.post('/auth/password_recovery_hash', |
| {'email': 'foo'}, status=404) |
| |
| |
| class TestOAuth(TestController): |
| def test_register_deregister_app(self): |
| # register |
| r = self.app.get('/auth/oauth/') |
| r = self.app.post('/auth/oauth/register', |
| params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez'}).follow() |
| assert 'oautstapp' in r |
| # deregister |
| assert_equal(r.forms[0].action, 'deregister') |
| r.forms[0].submit() |
| r = self.app.get('/auth/oauth/') |
| assert 'oautstapp' not in r |
| |
| def test_generate_revoke_access_token(self): |
| # generate |
| r = self.app.post('/auth/oauth/register', |
| params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez'}).follow() |
| assert_equal(r.forms[1].action, 'generate_access_token') |
| r.forms[1].submit() |
| r = self.app.get('/auth/oauth/') |
| assert 'Bearer Token:' in r |
| assert_not_equal( |
| M.OAuthAccessToken.for_user(M.User.by_username('test-admin')), []) |
| # revoke |
| assert_equal(r.forms[0].action, 'revoke_access_token') |
| r.forms[0].submit() |
| r = self.app.get('/auth/oauth/') |
| assert_not_equal(r.forms[0].action, 'revoke_access_token') |
| assert_equal( |
| M.OAuthAccessToken.for_user(M.User.by_username('test-admin')), []) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_interactive(self, Request, Server): |
| M.OAuthConsumerToken.consumer = mock.Mock() |
| user = M.User.by_username('test-admin') |
| consumer_token = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| ThreadLocalORMSession.flush_all() |
| req = Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_callback': 'http://my.domain.com/callback', |
| } |
| r = self.app.post('/rest/oauth/request_token', params={}) |
| rtok = parse_qs(r.body)['oauth_token'][0] |
| r = self.app.post('/rest/oauth/authorize', |
| params={'oauth_token': rtok}) |
| r = r.forms[0].submit('yes') |
| assert r.location.startswith('http://my.domain.com/callback') |
| pin = parse_qs(urlparse(r.location).query)['oauth_verifier'][0] |
| req = Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': rtok, |
| 'oauth_verifier': pin, |
| } |
| r = self.app.get('/rest/oauth/access_token') |
| atok = parse_qs(r.body) |
| assert_equal(len(atok['oauth_token']), 1) |
| assert_equal(len(atok['oauth_token_secret']), 1) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_request_token_valid(self, Request, Server): |
| M.OAuthConsumerToken.consumer = mock.Mock() |
| user = M.User.by_username('test-user') |
| consumer_token = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| req = Request.from_request.return_value = {'oauth_consumer_key': 'api_key'} |
| r = self.app.post('/rest/oauth/request_token', params={'key': 'value'}) |
| Request.from_request.assert_called_once_with( |
| 'POST', 'http://localhost/rest/oauth/request_token', |
| headers={'Host': 'localhost:80', 'Content-Type': 'application/x-www-form-urlencoded; charset="utf-8"'}, |
| parameters={'key': 'value'}, |
| query_string='') |
| Server().verify_request.assert_called_once_with(req, consumer_token.consumer, None) |
| request_token = M.OAuthRequestToken.query.get(consumer_token_id=consumer_token._id) |
| assert_is_not_none(request_token) |
| assert_equal(r.body, request_token.to_string()) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_request_token_no_consumer_token(self, Request, Server): |
| req = Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key'} |
| r = self.app.post('/rest/oauth/request_token', |
| params={'key': 'value'}, status=403) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_request_token_invalid(self, Request, Server): |
| Server().verify_request.side_effect = ValueError |
| M.OAuthConsumerToken.consumer = mock.Mock() |
| user = M.User.by_username('test-user') |
| consumer_token = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| req = Request.from_request.return_value = {'oauth_consumer_key': 'api_key'} |
| self.app.post('/rest/oauth/request_token', params={'key': 'value'}, status=403) |
| |
| def test_authorize_ok(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| rtok = M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/authorize', params={'oauth_token': 'api_key'}) |
| assert_in('ctok_desc', r.body) |
| assert_in('api_key', r.body) |
| |
| def test_authorize_invalid(self): |
| self.app.post('/rest/oauth/authorize', params={'oauth_token': 'api_key'}, status=403) |
| |
| def test_do_authorize_no(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| rtok = M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.post('/rest/oauth/do_authorize', |
| params={'no': '1', 'oauth_token': 'api_key'}) |
| assert_is_none(M.OAuthRequestToken.query.get(api_key='api_key')) |
| |
| def test_do_authorize_oob(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| rtok = M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'}) |
| assert_is_not_none(r.html.find(text=re.compile('^PIN: '))) |
| |
| def test_do_authorize_cb(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| rtok = M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'}) |
| assert r.location.startswith('http://my.domain.com/callback?oauth_token=api_key&oauth_verifier=') |
| |
| def test_do_authorize_cb_params(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| rtok = M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'}) |
| assert r.location.startswith('http://my.domain.com/callback?myparam=foo&oauth_token=api_key&oauth_verifier=') |
| |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_no_consumer(self, Request): |
| req = Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'good', |
| } |
| self.app.get('/rest/oauth/access_token', status=403) |
| |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_no_request(self, Request): |
| req = Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'good', |
| } |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/rest/oauth/access_token', status=403) |
| |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_bad_pin(self, Request): |
| req = Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'bad', |
| } |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| rtok = M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good', |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/rest/oauth/access_token', status=403) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_bad_sig(self, Request, Server): |
| req = Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'good', |
| } |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| rtok = M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good', |
| ) |
| ThreadLocalORMSession.flush_all() |
| Server().verify_request.side_effect = ValueError |
| self.app.get('/rest/oauth/access_token', status=403) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_ok(self, Request, Server): |
| req = Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'good', |
| } |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| rtok = M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good', |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.get('/rest/oauth/access_token') |
| atok = parse_qs(r.body) |
| assert_equal(len(atok['oauth_token']), 1) |
| assert_equal(len(atok['oauth_token_secret']), 1) |
| |
| |
| class TestDisableAccount(TestController): |
| def test_not_authenticated(self): |
| r = self.app.get( |
| '/auth/disable/', |
| extra_environ={'username': '*anonymous'}) |
| assert_equal(r.status_int, 302) |
| assert_equal(r.location, |
| 'http://localhost/auth/?return_to=%2Fauth%2Fdisable%2F') |
| |
| def test_lists_user_projects(self): |
| r = self.app.get('/auth/disable/') |
| user = M.User.by_username('test-admin') |
| for p in user.my_projects_by_role_name('Admin'): |
| assert_in(p.name, r) |
| assert_in(p.url(), r) |
| |
| def test_has_asks_password(self): |
| r = self.app.get('/auth/disable/') |
| form = r.html.find('form', {'action': 'do_disable'}) |
| assert form is not None |
| |
| def test_bad_password(self): |
| r = self.app.post('/auth/disable/do_disable', {'password': 'bad'}) |
| assert_in('Invalid password', r) |
| user = M.User.by_username('test-admin') |
| assert_equal(user.disabled, False) |
| |
| def test_disable(self): |
| r = self.app.post('/auth/disable/do_disable', {'password': 'foo'}) |
| assert_equal(r.status_int, 302) |
| assert_equal(r.location, 'http://localhost/') |
| flash = json.loads(self.webflash(r)) |
| assert_equal(flash['status'], 'ok') |
| assert_equal(flash['message'], 'Your account was successfully disabled!') |
| user = M.User.by_username('test-admin') |
| assert_equal(user.disabled, True) |
| |
| |
| class TestPasswordExpire(TestController): |
| |
| def login(self, username='test-user', pwd='foo', query_string=''): |
| r = self.app.get('/auth/' + query_string, extra_environ={'username': '*anonymous'}) |
| f = r.forms[0] |
| f['username'] = username |
| f['password'] = pwd |
| return f.submit(extra_environ={'username': '*anonymous'}) |
| |
| def assert_redirects(self, where='/'): |
| try: |
| self.app.get(where, extra_environ={'username': 'test-user'}, status=302) |
| except exc.HTTPFound as e: |
| assert_equal(e.location, '/auth/pwd_expired?' + urlencode({'return_to': where})) |
| |
| def assert_not_redirects(self): |
| self.app.get('/', extra_environ={'username': 'test-user'}, status=200) |
| |
| def test_disabled(self): |
| r = self.login() |
| assert_false(r.session.get('pwd-expired')) |
| self.assert_not_redirects() |
| |
| def expired(self, r): |
| return r.session.get('pwd-expired') |
| |
| def set_expire_for_user(self, username='test-user', days=100): |
| user = M.User.by_username(username) |
| user.last_password_updated = datetime.utcnow() - timedelta(days=days) |
| session(user).flush(user) |
| return user |
| |
| def test_days(self): |
| self.set_expire_for_user() |
| |
| with h.push_config(config, **{'auth.pwdexpire.days': 180}): |
| r = self.login() |
| assert_false(self.expired(r)) |
| self.assert_not_redirects() |
| |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| |
| def test_before(self): |
| self.set_expire_for_user() |
| |
| before = datetime.utcnow() - timedelta(days=180) |
| before = calendar.timegm(before.timetuple()) |
| with h.push_config(config, **{'auth.pwdexpire.before': before}): |
| r = self.login() |
| assert_false(self.expired(r)) |
| self.assert_not_redirects() |
| |
| before = datetime.utcnow() - timedelta(days=90) |
| before = calendar.timegm(before.timetuple()) |
| with h.push_config(config, **{'auth.pwdexpire.before': before}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| |
| def test_logout(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| r = self.app.get('/auth/logout', extra_environ={'username': 'test-user'}) |
| assert_false(self.expired(r)) |
| self.assert_not_redirects() |
| |
| def test_change_pwd(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| |
| user = M.User.by_username('test-user') |
| old_update_time = user.last_password_updated |
| old_password = user.password |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = 'foo' |
| f['pw'] = 'qwerty' |
| f['pw2'] = 'qwerty' |
| r = f.submit(extra_environ={'username': 'test-user'}, status=302) |
| assert_equal(r.location, 'http://localhost/') |
| assert_false(self.expired(r)) |
| user = M.User.by_username('test-user') |
| assert_true(user.last_password_updated > old_update_time) |
| assert_not_equal(user.password, old_password) |
| |
| # Can log in with new password and change isn't required anymore |
| r = self.login(pwd='qwerty') |
| assert_equal(r.location, 'http://localhost/') |
| assert_not_in('Invalid login', r) |
| assert_false(self.expired(r)) |
| self.assert_not_redirects() |
| |
| # and can't log in with old password |
| r = self.login(pwd='foo') |
| assert_in('Invalid login', r) |
| |
| def check_validation(self, oldpw, pw, pw2): |
| user = M.User.by_username('test-user') |
| old_update_time = user.last_password_updated |
| old_password = user.password |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = oldpw |
| f['pw'] = pw |
| f['pw2'] = pw2 |
| r = f.submit(extra_environ={'username': 'test-user'}) |
| assert_true(self.expired(r)) |
| user = M.User.by_username('test-user') |
| assert_equal(user.last_password_updated, old_update_time) |
| assert_equal(user.password, old_password) |
| return r |
| |
| def test_change_pwd_validation(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| |
| r = self.check_validation('', '', '') |
| assert_in('Please enter a value', r) |
| r = self.check_validation('', 'qwe', 'qwerty') |
| assert_in('Enter a value 6 characters long or more', r) |
| r = self.check_validation('bad', 'qwerty1', 'qwerty') |
| assert_in('Passwords must match', r) |
| r = self.check_validation('bad', 'qwerty', 'qwerty') |
| assert_in('Incorrect password', self.webflash(r)) |
| assert_equal(r.location, 'http://localhost/auth/pwd_expired?return_to=') |
| |
| with h.push_config(config, **{'auth.min_password_len': 3}): |
| r = self.check_validation('foo', 'foo', 'foo') |
| assert_in('Your old and new password should not be the same', r) |
| |
| def test_return_to(self): |
| return_to = '/p/test/tickets/?milestone=1.0&page=2' |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login(query_string='?' + urlencode({'return_to': return_to})) |
| # don't go to the return_to yet |
| assert_equal(r.location, 'http://localhost/auth/pwd_expired?' + urlencode({'return_to': return_to})) |
| |
| # but if user tries to go directly there anyway, intercept and redirect back |
| self.assert_redirects(where=return_to) |
| |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = 'foo' |
| f['pw'] = 'qwerty' |
| f['pw2'] = 'qwerty' |
| f['return_to'] = return_to |
| r = f.submit(extra_environ={'username': 'test-user'}, status=302) |
| assert_equal(r.location, 'http://localhost/p/test/tickets/?milestone=1.0&page=2') |