| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import calendar |
| from datetime import datetime, time, timedelta |
| from time import time as time_time |
| import json |
| from urlparse import urlparse, parse_qs |
| from urllib import urlencode |
| |
| from bson import ObjectId |
| import re |
| from ming.orm.ormsession import ThreadLocalORMSession, session |
| from tg import config, expose |
| from mock import patch, Mock |
| import mock |
| from nose.tools import ( |
| assert_equal, |
| assert_not_equal, |
| assert_is_none, |
| assert_is_not_none, |
| assert_in, |
| assert_not_in, |
| assert_true, |
| assert_false, |
| assert_raises |
| ) |
| from tg import tmpl_context as c, app_globals as g |
| from webob import exc |
| import oauth2 |
| |
| from allura.tests import TestController |
| from allura.tests import decorators as td |
| from allura.tests.decorators import audits, out_audits |
| from alluratest.controller import setup_trove_categories |
| from allura import model as M |
| from allura.lib import plugin |
| from allura.lib import helpers as h |
| from allura.lib.multifactor import TotpService, RecoveryCodeService |
| |
| |
| def unentity(s): |
| return s.replace('"', '"').replace('"', '"') |
| |
| |
| class TestAuth(TestController): |
| def test_login(self): |
| self.app.get('/auth/') |
| r = self.app.post('/auth/send_verification_link', params=dict(a='test@example.com', |
| _session_id=self.app.cookies['_session_id'])) |
| email = M.User.query.get(username='test-admin').email_addresses[0] |
| r = self.app.post('/auth/send_verification_link', params=dict(a=email, |
| _session_id=self.app.cookies['_session_id'])) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.get('/auth/verify_addr', params=dict(a='foo')) |
| assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r) |
| ea = M.EmailAddress.find({'email': email}).first() |
| r = self.app.get('/auth/verify_addr', params=dict(a=ea.nonce)) |
| assert json.loads(self.webflash(r))['status'] == 'ok', self.webflash(r) |
| r = self.app.get('/auth/logout') |
| |
| with audits('Successful login', user=True): |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| _session_id=self.app.cookies['_session_id']), |
| antispam=True).follow() |
| assert_equal(r.headers['Location'], 'http://localhost/dashboard') |
| |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='foo', honey1='robot', # bad honeypot value |
| _session_id=self.app.cookies['_session_id']), |
| extra_environ={'regular_antispam_err_handling_even_when_tests': 'true'}, |
| status=302) |
| wf = json.loads(self.webflash(r)) |
| assert_equal(wf['status'], 'error') |
| assert_equal(wf['message'], 'Spambot protection engaged') |
| |
| with audits('Failed login', user=True): |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='food', |
| _session_id=self.app.cookies['_session_id'])) |
| assert 'Invalid login' in str(r), r.showbrowser() |
| |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-usera', password='foo', |
| _session_id=self.app.cookies['_session_id'])) |
| assert 'Invalid login' in str(r), r.showbrowser() |
| |
| def test_login_diff_ips_ok(self): |
| # exercises AntiSpam.validate methods |
| extra = {'username': '*anonymous', 'REMOTE_ADDR': '11.22.33.44'} |
| r = self.app.get('/auth/', extra_environ=extra) |
| |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| with audits('Successful login', user=True): |
| r = f.submit(extra_environ={'username': '*anonymous', 'REMOTE_ADDR': '11.22.33.99'}) |
| |
| def test_login_diff_ips_bad(self): |
| # exercises AntiSpam.validate methods |
| extra = {'username': '*anonymous', 'REMOTE_ADDR': '24.52.32.123'} |
| r = self.app.get('/auth/', extra_environ=extra) |
| |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| r = f.submit(extra_environ={'username': '*anonymous', 'REMOTE_ADDR': '11.22.33.99', |
| 'regular_antispam_err_handling_even_when_tests': 'true'}, |
| status=302) |
| wf = json.loads(self.webflash(r)) |
| assert_equal(wf['status'], 'error') |
| assert_equal(wf['message'], 'Spambot protection engaged') |
| |
| @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)) |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| def test_login_hibp_compromised_password_untrusted_client(self, sendsimplemail): |
| # first & only login by this user, so won't have any trusted previous logins |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| r = self.app.get('/auth/') |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| |
| with audits('Attempted login from untrusted location with password in HIBP breach database', user=True): |
| r = f.submit(status=200) |
| |
| r.mustcontain('reset your password via email.') |
| r.mustcontain('reset your password via email.<br>\nPlease check your email') |
| |
| args, kwargs = sendsimplemail.post.call_args |
| assert_equal(sendsimplemail.post.call_count, 1) |
| assert_equal(kwargs['subject'], u'Update your %s password' % config['site_name']) |
| assert_in('/auth/forgotten_password/', kwargs['text']) |
| |
| assert_equal([], M.UserLoginDetails.query.find().all()) # no records created |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| def test_login_hibp_compromised_password_trusted_client(self, sendsimplemail): |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login first, so IP address will be recorded and then trusted |
| r = self.app.get('/auth/') |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| with audits('Successful login', user=True): |
| f.submit(status=302) |
| self.app.get('/auth/logout') |
| |
| # this login will get caught by HIBP check, but trusted due to IP address being same |
| with patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)): |
| r = self.app.get('/auth/') |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| |
| with audits(r'Successful login with password in HIBP breach database, from trusted source ' |
| r'\(reason: exact ip\)', user=True): |
| r = f.submit(status=302) |
| |
| assert r.session.get('pwd-expired') |
| assert_equal(r.session.get('expired-reason'), 'hibp') |
| assert_equal(r.location, 'http://localhost/auth/pwd_expired') |
| |
| r = r.follow() |
| r.mustcontain('must be updated to be more secure') |
| |
| # changing password covered in TestPasswordExpire |
| |
| def test_logout(self): |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| nav_pattern = ('nav', {'class': 'nav-main'}) |
| r = self.app.get('/auth/') |
| |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| _session_id=self.app.cookies['_session_id']), |
| extra_environ={'REMOTE_ADDR': '127.0.0.1'}, |
| antispam=True).follow().follow() |
| |
| logged_in_session = r.session['_id'] |
| links = r.html.find(*nav_pattern).findAll('a') |
| assert_equal(links[-1].string, "Log Out") |
| |
| r = self.app.get('/auth/logout').follow().follow() |
| logged_out_session = r.session['_id'] |
| assert logged_in_session is not logged_out_session |
| links = r.html.find(*nav_pattern).findAll('a') |
| assert_equal(links[-1].string, 'Log In') |
| |
| def test_track_login(self): |
| user = M.User.by_username('test-user') |
| assert_equal(user.last_access['login_date'], None) |
| assert_equal(user.last_access['login_ip'], None) |
| assert_equal(user.last_access['login_ua'], None) |
| |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/do_login', |
| headers={'User-Agent': 'browser'}, |
| extra_environ={'REMOTE_ADDR': '127.0.0.1'}, |
| params=dict( |
| username='test-user', |
| password='foo', |
| _session_id=self.app.cookies['_session_id'], |
| ), |
| antispam=True, |
| ) |
| user = M.User.by_username('test-user') |
| assert_not_equal(user.last_access['login_date'], None) |
| assert_equal(user.last_access['login_ip'], '127.0.0.1') |
| assert_equal(user.last_access['login_ua'], 'browser') |
| |
| def test_rememberme(self): |
| username = M.User.query.get(username='test-user').username |
| |
| r = self.app.get('/').follow() # establish session |
| |
| # Login as test-user with remember me checkbox off |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| _session_id=self.app.cookies['_session_id'], |
| ), antispam=True) |
| assert_equal(r.session['username'], username) |
| assert_equal(r.session['login_expires'], True) |
| |
| for header, contents in r.headerlist: |
| if header == 'Set-cookie': |
| assert_not_in('expires', contents) |
| |
| # Login as test-user with remember me checkbox on |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', rememberme='on', |
| _session_id=self.app.cookies['_session_id'], |
| ), antispam=True) |
| assert_equal(r.session['username'], username) |
| assert_not_equal(r.session['login_expires'], True) |
| |
| for header, contents in r.headerlist: |
| if header == 'Set-cookie': |
| assert_in('expires', contents) |
| |
| @td.with_user_project('test-admin') |
| def test_user_can_not_claim_duplicate_emails(self): |
| email_address = 'test_abcd_123@domain.net' |
| user = M.User.query.get(username='test-admin') |
| addresses_number = len(user.email_addresses) |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).count() == 1 |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r) |
| assert M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).count() == 1 |
| assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_added_claimed_address_by_other_user_confirmed(self, gen_message_id, sendsimplemail): |
| self.app.get('/').follow() # establish session |
| email_address = 'test_abcd_123@domain.net' |
| |
| # test-user claimed & confirmed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address)).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| |
| # Claiming the same email address by test-admin |
| # the email should be added to the email_addresses list but notifications should not be sent |
| |
| admin = M.User.query.get(username='test-admin') |
| addresses_number = len(admin.email_addresses) |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'A verification email has been sent. ' \ |
| 'Please check your email and click to confirm.' |
| |
| args, kwargs = sendsimplemail.post.call_args |
| |
| assert sendsimplemail.post.call_count == 1 |
| assert kwargs['toaddr'] == email_address |
| assert kwargs['subject'] == u'%s - Email address claim attempt' % config['site_name'] |
| assert "You tried to add %s to your Allura account, " \ |
| "but it is already claimed by your %s account." % (email_address, user.username) in kwargs['text'] |
| |
| assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1 |
| assert len(M.EmailAddress.find(dict(email=email_address)).all()) == 2 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_added_claimed_address_by_other_user_not_confirmed(self, gen_message_id, sendsimplemail): |
| email_address = 'test_abcd_1235@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address)).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| # Claiming the same email address by test-admin |
| # the email should be added to the email_addresses list but notifications should not be sent |
| |
| user1 = M.User.query.get(username='test-user-1') |
| addresses_number = len(user1.email_addresses) |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-user-1')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'A verification email has been sent. ' \ |
| 'Please check your email and click to confirm.' |
| assert sendsimplemail.post.called |
| assert len(M.User.query.get(username='test-user-1').email_addresses) == addresses_number + 1 |
| assert len(M.EmailAddress.find(dict(email=email_address)).all()) == 2 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_cannot_claim_more_than_max_limit(self, gen_message_id, sendsimplemail): |
| with h.push_config(config, **{'user_prefs.maximum_claimed_emails': '2'}): |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': 'test_abcd_1@domain.net', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-user-1')) |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': 'test_abcd_2@domain.net', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-user-1')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error' |
| assert json.loads(self.webflash(r))['message'] == 'You cannot claim more than 2 email addresses.' |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_verification_link_for_confirmed_email(self, gen_message_id, sendsimplemail): |
| self.app.get('/').follow() # establish session |
| email_address = 'test_abcd@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| email.confirmed = True |
| |
| user1 = M.User.query.get(username='test-user-1') |
| user1.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user1._id)).first() |
| email.confirmed = False |
| |
| ThreadLocalORMSession.flush_all() |
| |
| r = self.app.post('/auth/send_verification_link', |
| params=dict(a=email_address, _session_id=self.app.cookies['_session_id']), |
| extra_environ=dict(username='test-user-1', _session_id=self.app.cookies['_session_id'])) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'Verification link sent' |
| |
| args, kwargs = sendsimplemail.post.call_args |
| assert sendsimplemail.post.call_count == 1 |
| assert kwargs['toaddr'] == email_address |
| assert kwargs['subject'] == u'%s - Email address claim attempt' % config['site_name'] |
| assert "You tried to add %s to your Allura account, " \ |
| "but it is already claimed by your %s account." % (email_address, user.username) in kwargs['text'] |
| |
| def test_invalidate_verification_link_if_email_was_confirmed(self): |
| self.app.get('/').follow() # establish session |
| email_address = 'test_abcd@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| |
| self.app.post('/auth/send_verification_link', |
| params=dict(a=email_address, |
| _session_id=self.app.cookies['_session_id']), |
| extra_environ=dict(username='test-user')) |
| |
| user1 = M.User.query.get(username='test-user-1') |
| user1.claim_address(email_address) |
| email1 = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user1._id)).first() |
| email1.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| # Verify first email with the verification link |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), extra_environ=dict(username='test-user')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error' |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| assert not email.confirmed |
| |
| def test_verify_addr_correct_session(self): |
| self.app.get('/').follow() # establish session |
| email_address = 'test_abcd@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| |
| self.app.post('/auth/send_verification_link', |
| params=dict(a=email_address, |
| _session_id=self.app.cookies['_session_id']), |
| extra_environ=dict(username='test-user')) |
| |
| # logged out, gets redirected to login page |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), extra_environ=dict(username='*anonymous')) |
| assert_in('/auth/?return_to=%2Fauth%2Fverify_addr', r.location) |
| |
| # logged in as someone else |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), extra_environ=dict(username='test-admin')) |
| assert_in('/auth/?return_to=%2Fauth%2Fverify_addr', r.location) |
| assert_equal('You must be logged in to the correct account', json.loads(self.webflash(r))['message']) |
| assert_equal('warning', json.loads(self.webflash(r))['status']) |
| |
| # logged in as correct user |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), extra_environ=dict(username='test-user')) |
| assert_in('confirmed', json.loads(self.webflash(r))['message']) |
| assert_equal('ok', json.loads(self.webflash(r))['status']) |
| |
| @staticmethod |
| def _create_password_reset_hash(): |
| """ Generates a password reset token for a given user. |
| |
| :return: User object |
| :rtype: User |
| """ |
| # test-user claimed email address |
| user = M.User.by_username('test-admin') |
| user.set_tool_data('AuthPasswordReset', |
| hash="generated_hash_value", |
| hash_expiry="04-08-2020") |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| session(user).flush(user) |
| |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert_equal(hash, 'generated_hash_value') |
| assert_equal(hash_expiry, '04-08-2020') |
| return user |
| |
| def test_token_generator(self): |
| """ Generates new token invalidation tests. |
| |
| The tests cover: changing, claiming, updating, removing email addresses. |
| :returns: email_change_invalidates_token |
| """ |
| _params = [{'new_addr.addr': 'test_abcd@domain.net', # Change primary address |
| 'primary_addr': 'test@example.com', }, |
| {'new_addr.addr': 'test@example.com', # Claim new address |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain'}, |
| {'addr-1.ord': '1', # remove test-admin@users.localhost |
| 'addr-1.delete': 'on', |
| 'addr-2.ord': '2', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain'}, |
| {'addr-1.ord': '1', # Remove email |
| 'addr-2.ord': '2', |
| 'addr-2.delete': 'on', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost'}] |
| |
| for param in _params: |
| yield self.email_change_invalidates_token, param |
| |
| def email_change_invalidates_token(self, change_params): |
| user = self._create_password_reset_hash() |
| session(user).flush(user) |
| |
| self.app.get('/').follow() # establish session |
| change_params['_session_id'] = self.app.cookies['_session_id'] |
| self.app.post('/auth/preferences/update_emails', |
| extra_environ=dict(username='test-admin'), |
| params=change_params) |
| |
| u = M.User.by_username('test-admin') |
| print(u.get_tool_data('AuthPasswordReset', 'hash')) |
| assert_equal(u.get_tool_data('AuthPasswordReset', 'hash'), '') |
| assert_equal(u.get_tool_data('AuthPasswordReset', 'hash_expiry'), '') |
| |
| @td.with_user_project('test-admin') |
| def test_change_password(self): |
| self.app.get('/').follow() # establish session |
| # Get and assert user with password reset token. |
| user = self._create_password_reset_hash() |
| old_pass = user.get_pref('password') |
| |
| # Change password |
| with audits('Password changed', user=True): |
| self.app.post('/auth/preferences/change_password', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'oldpw': 'foo', |
| 'pw': 'asdfasdf', |
| 'pw2': 'asdfasdf', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| |
| # Confirm password was changed. |
| assert_not_equal(old_pass, user.get_pref('password')) |
| |
| # Confirm any existing tokens were reset. |
| assert_equal(user.get_tool_data('AuthPasswordReset', 'hash'), '') |
| assert_equal(user.get_tool_data('AuthPasswordReset', 'hash_expiry'), '') |
| |
| # Confirm an email was sent |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert_equal(len(tasks), 1) |
| assert_equal(tasks[0].kwargs['subject'], 'Password Changed') |
| assert_in('The password for your', tasks[0].kwargs['text']) |
| |
| @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)) |
| @td.with_user_project('test-admin') |
| def test_change_password_hibp(self): |
| self.app.get('/').follow() # establish session |
| # Get and assert user with password reset token. |
| user = self._create_password_reset_hash() |
| old_pass = user.get_pref('password') |
| |
| # Attempt change password with weak pwd |
| r = self.app.post('/auth/preferences/change_password', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'oldpw': 'foo', |
| 'pw': 'password', |
| 'pw2': 'password', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| |
| assert 'Unsafe' in str(r.headers) |
| |
| r = self.app.post('/auth/preferences/change_password', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'oldpw': 'foo', |
| 'pw': '3j84rhoirwnoiwrnoiw', |
| 'pw2': '3j84rhoirwnoiwrnoiw', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| assert 'Unsafe' not in str(r.headers) |
| |
| # Confirm password was changed. |
| user = M.User.by_username('test-admin') |
| assert_not_equal(old_pass, user.get_pref('password')) |
| |
| @td.with_user_project('test-admin') |
| def test_prefs(self): |
| r = self.app.get('/auth/preferences/', |
| extra_environ=dict(username='test-admin')) |
| # check preconditions of test data |
| assert 'test@example.com' not in r |
| assert 'test-admin@users.localhost' in r |
| assert_equal(M.User.query.get(username='test-admin').get_pref('email_address'), |
| 'test-admin@users.localhost') |
| |
| # add test@example |
| with td.audits('New email address: test@example.com', user=True): |
| r = self.app.post('/auth/preferences/update_emails', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'new_addr.addr': 'test@example.com', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| r = self.app.get('/auth/preferences/') |
| assert 'test@example.com' in r |
| user = M.User.query.get(username='test-admin') |
| assert_equal(user.get_pref('email_address'), 'test-admin@users.localhost') |
| |
| # remove test-admin@users.localhost |
| with td.audits('Email address deleted: test-admin@users.localhost', user=True): |
| r = self.app.post('/auth/preferences/update_emails', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'addr-1.ord': '1', |
| 'addr-1.delete': 'on', |
| 'addr-2.ord': '2', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| r = self.app.get('/auth/preferences/') |
| assert 'test-admin@users.localhost' not in r |
| # preferred address has not changed if email is not verified |
| user = M.User.query.get(username='test-admin') |
| assert_equal(user.get_pref('email_address'), None) |
| |
| with td.audits('Display Name changed Test Admin => Admin', user=True): |
| r = self.app.post('/auth/preferences/update', |
| params={'preferences.display_name': 'Admin', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| @td.with_user_project('test-admin') |
| def test_email_prefs_change_requires_password(self): |
| self.app.get('/').follow() # establish session |
| # Claim new email |
| new_email_params = { |
| 'new_addr.addr': 'test@example.com', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| '_session_id': self.app.cookies['_session_id'], |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to claim new email', self.webflash(r)) |
| assert_not_in('test@example.com', r.follow()) |
| new_email_params['password'] = 'bad pass' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to claim new email', self.webflash(r)) |
| assert_not_in('test@example.com', r.follow()) |
| new_email_params['password'] = 'foo' # valid password |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_not_in('You must provide your current password to claim new email', self.webflash(r)) |
| assert_in('test@example.com', r.follow()) |
| |
| # Change primary address |
| change_primary_params = { |
| 'new_addr.addr': '', |
| 'primary_addr': 'test@example.com', |
| '_session_id': self.app.cookies['_session_id'], |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to change primary address', self.webflash(r)) |
| assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test-admin@users.localhost') |
| change_primary_params['password'] = 'bad pass' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to change primary address', self.webflash(r)) |
| assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test-admin@users.localhost') |
| change_primary_params['password'] = 'foo' # valid password |
| |
| self.app.get('/auth/preferences/') # let previous 'flash' message cookie get used up |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert_not_in('You must provide your current password to change primary address', self.webflash(r)) |
| assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test@example.com') |
| |
| # Remove email |
| remove_email_params = { |
| 'addr-1.ord': '1', |
| 'addr-2.ord': '2', |
| 'addr-2.delete': 'on', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost', |
| '_session_id': self.app.cookies['_session_id'], |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to delete an email', self.webflash(r)) |
| assert_in('test@example.com', r.follow()) |
| remove_email_params['password'] = 'bad pass' |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_in('You must provide your current password to delete an email', self.webflash(r)) |
| assert_in('test@example.com', r.follow()) |
| remove_email_params['password'] = 'foo' # vallid password |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert_not_in('You must provide your current password to delete an email', self.webflash(r)) |
| assert_not_in('test@example.com', r.follow()) |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions(self): |
| r = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| subscriptions = M.Mailbox.query.find(dict( |
| user_id=c.user._id, is_flash=False)).all() |
| # make sure page actually lists all the user's subscriptions |
| assert len(subscriptions) > 0, 'Test user has no subscriptions, cannot verify that they are shown' |
| for m in subscriptions: |
| assert str(m._id) in r, "Page doesn't list subscription for Mailbox._id = %s" % m._id |
| |
| # make sure page lists all tools which user can subscribe |
| user = M.User.query.get(username='test-admin') |
| for p in user.my_projects(): |
| for ac in p.app_configs: |
| if not M.Mailbox.subscribed(project_id=p._id, app_config_id=ac._id): |
| if ac.tool_name in ('activity', 'admin', 'search', 'userstats', 'profile'): |
| # these have has_notifications=False |
| assert str(ac._id) not in r, "Page lists tool %s but it should not" % ac.tool_name |
| else: |
| assert str(ac._id) in r, "Page doesn't list tool %s" % ac.tool_name |
| |
| |
| @td.with_user_project('test-admin') |
| def test_update_user_notifications(self): |
| self.app.get('/').follow() # establish session |
| assert not M.User.query.get(username='test-admin').get_pref('mention_notifications') |
| self.app.post('/auth/subscriptions/update_user_notifications', |
| params={'_session_id': self.app.cookies['_session_id'], |
| }) |
| assert not M.User.query.get(username='test-admin').get_pref('mention_notifications') |
| self.app.post('/auth/subscriptions/update_user_notifications', |
| params={'allow_umnotif': 'on', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| assert M.User.query.get(username='test-admin').get_pref('mention_notifications') |
| |
| |
| def _find_subscriptions_form(self, r): |
| form = None |
| for f in r.forms.itervalues(): |
| if f.action == 'update_subscriptions': |
| form = f |
| break |
| assert form is not None, "Can't find subscriptions form" |
| return form |
| |
| def _find_subscriptions_field(self, form, subscribed=False): |
| field_name = None |
| for k, v in form.fields.iteritems(): |
| if subscribed: |
| check = c and v[0].value == 'on' |
| else: |
| check = c and v[0].value != 'on' |
| if k and k.endswith('.subscribed') and check: |
| field_name = k.replace('.subscribed', '') |
| assert field_name, "Can't find unsubscribed tool for user" |
| return field_name |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions_subscribe(self): |
| resp = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| form = self._find_subscriptions_form(resp) |
| # find not subscribed tool, subscribe and verify |
| field_name = self._find_subscriptions_field(form, subscribed=False) |
| t_id = ObjectId(form.fields[field_name + '.tool_id'][0].value) |
| p_id = ObjectId(form.fields[field_name + '.project_id'][0].value) |
| subscribed = M.Mailbox.subscribed(project_id=p_id, app_config_id=t_id) |
| assert not subscribed, "User already subscribed for tool %s" % t_id |
| form.fields[field_name + '.subscribed'][0].value = 'on' |
| form.submit() |
| subscribed = M.Mailbox.subscribed(project_id=p_id, app_config_id=t_id) |
| assert subscribed, "User is not subscribed for tool %s" % t_id |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions_unsubscribe(self): |
| resp = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| form = self._find_subscriptions_form(resp) |
| field_name = self._find_subscriptions_field(form, subscribed=True) |
| s_id = ObjectId(form.fields[field_name + '.subscription_id'][0].value) |
| s = M.Mailbox.query.get(_id=s_id) |
| assert s, "User has not subscription with Mailbox._id = %s" % s_id |
| form.fields[field_name + '.subscribed'][0].value = None |
| form.submit() |
| s = M.Mailbox.query.get(_id=s_id) |
| assert not s, "User still has subscription with Mailbox._id %s" % s_id |
| |
| def test_format_email(self): |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/subscriptions/update_subscriptions', |
| params={'email_format': 'plain', 'subscriptions': '', |
| '_session_id': self.app.cookies['_session_id']}) |
| r = self.app.get('/auth/subscriptions/') |
| assert '<option selected value="plain">Plain Text</option>' in r |
| self.app.post('/auth/subscriptions/update_subscriptions', |
| params={'email_format': 'both', 'subscriptions': '', |
| '_session_id': self.app.cookies['_session_id']}) |
| r = self.app.get('/auth/subscriptions/') |
| assert '<option selected value="both">HTML</option>' in r |
| |
| def test_create_account(self): |
| r = self.app.get('/auth/create_account') |
| assert 'Create an Account' in r |
| r = self.app.post('/auth/save_new', |
| params=dict(username='AAA', pw='123', |
| _session_id=self.app.cookies['_session_id'])) |
| assert_in('Enter a value 6 characters long or more', r) |
| assert_in('Usernames must include only small letters, numbers, ' |
| 'and dashes. They must also start with a letter and be ' |
| 'at least 3 characters long.', r) |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| r = r.follow().follow() |
| assert 'User "aaa" registered' in unentity(r.body) |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| assert 'That username is already taken. Please choose another.' in r |
| r = self.app.get('/auth/logout') |
| r = self.app.post( |
| '/auth/do_login', |
| params=dict(username='aaa', password='12345678', |
| _session_id=self.app.cookies['_session_id']), antispam=True, |
| status=302) |
| |
| def test_create_account_require_email(self): |
| self.app.get('/').follow() # establish session |
| with h.push_config(config, **{'auth.require_email_addr': 'false'}): |
| self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='aaa') |
| assert not user.pending |
| assert_equal(M.Project.query.find({'name': 'u/aaa'}).count(), 1) |
| with h.push_config(config, **{'auth.require_email_addr': 'true'}): |
| self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='bbb', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com', |
| _session_id=self.app.cookies['_session_id'] |
| )) |
| user = M.User.query.get(username='bbb') |
| assert user.pending |
| assert_equal(M.Project.query.find({'name': 'u/bbb'}).count(), 0) |
| |
| def test_verify_email(self): |
| with h.push_config(config, **{'auth.require_email_addr': 'true'}): |
| self.app.get('/').follow() # establish session |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com', |
| _session_id=self.app.cookies['_session_id'] |
| )) |
| r = r.follow() |
| user = M.User.query.get(username='aaa') |
| em = M.EmailAddress.get(email='test@example.com') |
| assert user._id == em.claimed_by_user_id |
| r = self.app.get('/auth/verify_addr', params=dict(a=em.nonce)) |
| user = M.User.query.get(username='aaa') |
| em = M.EmailAddress.get(email='test@example.com') |
| assert not user.pending |
| assert em.confirmed |
| assert user.get_pref('email_address') |
| assert_equal(M.Project.query.find({'name': 'u/aaa'}).count(), 1) |
| |
| def test_create_account_disabled_header_link(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| r = self.app.get('/') |
| assert 'Register' not in r |
| |
| def test_create_account_disabled_form_gone(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| r = self.app.get('/auth/create_account', status=404) |
| assert 'Create an Account' not in r |
| |
| def test_create_account_disabled_submit_fails(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| _session_id=self.app.cookies['_session_id'] |
| ), |
| status=404) |
| |
| def test_one_project_role(self): |
| """Make sure when a user goes to a new project only one project role is created. |
| There was an issue with extra project roles getting created if a user went directly to |
| an admin page.""" |
| p_nbhd = M.Neighborhood.query.get(name='Projects') |
| p = M.Project.query.get(shortname='test', neighborhood_id=p_nbhd._id) |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/save_new', params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com', |
| _session_id=self.app.cookies['_session_id'], |
| )).follow() |
| user = M.User.query.get(username='aaa') |
| user.pending = False |
| session(user).flush(user) |
| assert M.ProjectRole.query.find( |
| dict(user_id=user._id, project_id=p._id)).count() == 0 |
| |
| self.app.get('/p/test/admin/permissions', |
| extra_environ=dict(username='aaa'), status=403) |
| assert M.ProjectRole.query.find( |
| dict(user_id=user._id, project_id=p._id)).count() <= 1 |
| |
| def test_default_lookup(self): |
| # Make sure that default _lookup() throws 404 |
| self.app.get('/auth/foobar', status=404) |
| |
| def test_disabled_user(self): |
| user = M.User.query.get(username='test-admin') |
| sess = session(user) |
| assert not user.disabled |
| r = self.app.get('/p/test/admin/', |
| extra_environ={'username': 'test-admin'}) |
| assert_equal(r.status_int, 200, 'Redirect to %s' % r.location) |
| user.disabled = True |
| sess.save(user) |
| sess.flush() |
| user = M.User.query.get(username='test-admin') |
| assert user.disabled |
| r = self.app.get('/p/test/admin/', |
| extra_environ={'username': 'test-admin'}) |
| assert_equal(r.status_int, 302) |
| assert_equal(r.location, 'http://localhost/auth/?return_to=%2Fp%2Ftest%2Fadmin%2F') |
| |
| def test_no_open_return_to(self): |
| r = self.app.get('/auth/logout').follow().follow() |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| return_to='/foo', |
| _session_id=self.app.cookies['_session_id']), |
| antispam=True |
| ) |
| assert_equal(r.location, 'http://localhost/foo') |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='foo', |
| return_to='http://localhost/foo', |
| _session_id=self.app.cookies['_session_id'])) |
| assert_equal(r.location, 'http://localhost/foo') |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='foo', |
| return_to='http://example.com/foo', |
| _session_id=self.app.cookies['_session_id'])).follow() |
| assert_equal(r.location, 'http://localhost/dashboard') |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='foo', |
| return_to='//example.com/foo', |
| _session_id=self.app.cookies['_session_id'])).follow() |
| assert_equal(r.location, 'http://localhost/dashboard') |
| |
| def test_no_injected_headers_in_return_to(self): |
| r = self.app.get('/auth/logout').follow().follow() |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| return_to='/foo\nContent-Length: 777', |
| # WebTest actually will raise an error if there's an invalid header (webob itself does not) |
| _session_id=self.app.cookies['_session_id']), |
| antispam=True |
| ) |
| assert_equal(r.location, 'http://localhost/') |
| assert_not_equal(r.content_length, 777) |
| |
| |
| class TestPreferences(TestController): |
| @td.with_user_project('test-admin') |
| def test_personal_data(self): |
| from pytz import country_names |
| |
| setsex, setbirthdate, setcountry, setcity, settimezone = \ |
| ('Male', '19/08/1988', 'IT', 'Milan', 'Europe/Rome') |
| self.app.get('/auth/user_info/') |
| |
| # Check if personal data is properly set |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict( |
| sex=setsex, |
| birthdate=setbirthdate, |
| country=setcountry, |
| city=setcity, |
| timezone=settimezone, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| sex = user.sex |
| assert sex == setsex |
| birthdate = user.birthdate.strftime('%d/%m/%Y') |
| assert birthdate == setbirthdate |
| country = user.localization.country |
| assert country_names.get(setcountry) == country |
| city = user.localization.city |
| assert city == setcity |
| timezone = user.timezone |
| assert timezone == settimezone |
| |
| # Check if setting a wrong date everything works correctly |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict(birthdate='30/02/1998', _session_id=self.app.cookies['_session_id'])) |
| assert 'Please enter a valid date' in str(r) |
| user = M.User.query.get(username='test-admin') |
| sex = user.sex |
| assert sex == setsex |
| birthdate = user.birthdate.strftime('%d/%m/%Y') |
| assert birthdate == setbirthdate |
| country = user.localization.country |
| assert country_names.get(setcountry) == country |
| city = user.localization.city |
| assert city == setcity |
| timezone = user.timezone |
| assert timezone == settimezone |
| |
| # Check deleting birthdate |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict( |
| sex=setsex, |
| birthdate='', |
| country=setcountry, |
| city=setcity, |
| timezone=settimezone, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert user.birthdate is None |
| |
| @td.with_user_project('test-admin') |
| def test_contacts(self): |
| # Add skype account |
| testvalue = 'testaccount' |
| self.app.get('/auth/user_info/contacts/') |
| self.app.post('/auth/user_info/contacts/skype_account', |
| params=dict(skypeaccount=testvalue, _session_id=self.app.cookies['_session_id'])) |
| user = M.User.query.get(username='test-admin') |
| assert user.skypeaccount == testvalue |
| |
| # Add social network account |
| socialnetwork = 'Facebook' |
| accounturl = 'http://www.facebook.com/test' |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(socialnetwork=socialnetwork, |
| accounturl=accounturl, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 |
| assert_equal(user.socialnetworks[0].socialnetwork, socialnetwork) |
| assert_equal(user.socialnetworks[0].accounturl, accounturl) |
| |
| # Add second social network account |
| socialnetwork2 = 'Twitter' |
| accounturl2 = 'http://twitter.com/test' |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(socialnetwork=socialnetwork2, |
| accounturl='@test', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 2 |
| assert_in({'socialnetwork': socialnetwork, 'accounturl': accounturl}, user.socialnetworks) |
| assert_in({'socialnetwork': socialnetwork2, 'accounturl': accounturl2}, user.socialnetworks) |
| |
| # Remove first social network account |
| self.app.post('/auth/user_info/contacts/remove_social_network', |
| params=dict(socialnetwork=socialnetwork, |
| account=accounturl, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 |
| assert_in({'socialnetwork': socialnetwork2, 'accounturl': accounturl2}, user.socialnetworks) |
| |
| # Add empty social network account |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(accounturl=accounturl, socialnetwork='', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 |
| assert_in({'socialnetwork': socialnetwork2, 'accounturl': accounturl2}, user.socialnetworks) |
| |
| # Add invalid social network account |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(accounturl=accounturl, socialnetwork='invalid', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 |
| assert_in({'socialnetwork': socialnetwork2, 'accounturl': accounturl2}, user.socialnetworks) |
| |
| # Add telephone number |
| telnumber = '+3902123456' |
| self.app.post('/auth/user_info/contacts/add_telnumber', |
| params=dict(newnumber=telnumber, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 1 and (user.telnumbers[0] == telnumber)) |
| |
| # Add second telephone number |
| telnumber2 = '+3902654321' |
| self.app.post('/auth/user_info/contacts/add_telnumber', |
| params=dict(newnumber=telnumber2, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 2 and telnumber in user.telnumbers and telnumber2 in user.telnumbers) |
| |
| # Remove first telephone number |
| self.app.post('/auth/user_info/contacts/remove_telnumber', |
| params=dict(oldvalue=telnumber, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 1 and telnumber2 in user.telnumbers) |
| |
| # Add website |
| website = 'http://www.testurl.com' |
| self.app.post('/auth/user_info/contacts/add_webpage', |
| params=dict(newwebsite=website, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 1 and (website in user.webpages)) |
| |
| # Add second website |
| website2 = 'http://www.testurl2.com' |
| self.app.post('/auth/user_info/contacts/add_webpage', |
| params=dict(newwebsite=website2, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 2 and website in user.webpages and website2 in user.webpages) |
| |
| # Remove first website |
| self.app.post('/auth/user_info/contacts/remove_webpage', |
| params=dict(oldvalue=website, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 1 and website2 in user.webpages) |
| |
| @td.with_user_project('test-admin') |
| def test_availability(self): |
| # Add availability timeslot |
| weekday = 'Monday' |
| starttime = time(9, 0, 0) |
| endtime = time(12, 0, 0) |
| |
| self.app.get('/auth/user_info/availability/') |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday, |
| starttime=starttime.strftime('%H:%M'), |
| endtime=endtime.strftime('%H:%M'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| timeslot1dict = dict( |
| week_day=weekday, start_time=starttime, end_time=endtime) |
| assert len( |
| user.availability) == 1 and timeslot1dict in user.get_availability_timeslots() |
| |
| weekday2 = 'Tuesday' |
| starttime2 = time(14, 0, 0) |
| endtime2 = time(16, 0, 0) |
| |
| # Add second availability timeslot |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday2, |
| starttime=starttime2.strftime('%H:%M'), |
| endtime=endtime2.strftime('%H:%M'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| timeslot2dict = dict(week_day=weekday2, |
| start_time=starttime2, end_time=endtime2) |
| assert len(user.availability) == 2 |
| assert_in(timeslot1dict, user.get_availability_timeslots()) |
| assert_in(timeslot2dict, user.get_availability_timeslots()) |
| |
| # Remove availability timeslot |
| r = self.app.post('/auth/user_info/availability/remove_timeslot', |
| params=dict( |
| weekday=weekday, |
| starttime=starttime.strftime('%H:%M'), |
| endtime=endtime.strftime('%H:%M'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.availability) == 1 and timeslot2dict in user.get_availability_timeslots() |
| |
| # Add invalid availability timeslot |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday2, |
| starttime=endtime2.strftime('%H:%M'), |
| endtime=starttime2.strftime('%H:%M'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| assert 'Invalid period:' in str(r) |
| user = M.User.query.get(username='test-admin') |
| timeslot2dict = dict(week_day=weekday2, |
| start_time=starttime2, end_time=endtime2) |
| assert len(user.availability) == 1 and timeslot2dict in user.get_availability_timeslots() |
| |
| @td.with_user_project('test-admin') |
| def test_inactivity(self): |
| # Add inactivity period |
| now = datetime.utcnow().date() |
| now = datetime(now.year, now.month, now.day) |
| startdate = now + timedelta(days=1) |
| enddate = now + timedelta(days=7) |
| self.app.get('/auth/user_info/availability/') |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate=startdate.strftime('%d/%m/%Y'), |
| enddate=enddate.strftime('%d/%m/%Y'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| period1dict = dict(start_date=startdate, end_date=enddate) |
| assert len(user.inactiveperiod) == 1 and period1dict in user.get_inactive_periods() |
| |
| # Add second inactivity period |
| startdate2 = now + timedelta(days=24) |
| enddate2 = now + timedelta(days=28) |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate=startdate2.strftime('%d/%m/%Y'), |
| enddate=enddate2.strftime('%d/%m/%Y'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| period2dict = dict(start_date=startdate2, end_date=enddate2) |
| assert len(user.inactiveperiod) == 2 |
| assert_in(period1dict, user.get_inactive_periods()) |
| assert_in(period2dict, user.get_inactive_periods()) |
| |
| # Remove first inactivity period |
| r = self.app.post( |
| '/auth/user_info/availability/remove_inactive_period', |
| params=dict( |
| startdate=startdate.strftime('%d/%m/%Y'), |
| enddate=enddate.strftime('%d/%m/%Y'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.inactiveperiod) == 1 and period2dict in user.get_inactive_periods() |
| |
| # Add invalid inactivity period |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate='NOT/A/DATE', |
| enddate=enddate2.strftime('%d/%m/%Y'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert 'Please enter a valid date' in str(r) |
| assert len(user.inactiveperiod) == 1 and period2dict in user.get_inactive_periods() |
| |
| @td.with_user_project('test-admin') |
| def test_skills(self): |
| setup_trove_categories() |
| # Add a skill |
| skill_cat = M.TroveCategory.query.get(show_as_skill=True) |
| level = 'low' |
| comment = 'test comment' |
| self.app.get('/auth/user_info/skills/') |
| self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level, |
| comment=comment, |
| selected_skill=str(skill_cat.trove_cat_id), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| skilldict = dict(category_id=skill_cat._id, |
| comment=comment, level=level) |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Add again the same skill |
| level = 'medium' |
| comment = 'test comment 2' |
| self.app.get('/auth/user_info/skills/') |
| self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level, |
| comment=comment, |
| selected_skill=str(skill_cat.trove_cat_id), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| skilldict = dict(category_id=skill_cat._id, |
| comment=comment, level=level) |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Add an invalid skill |
| level2 = 'not a level' |
| comment2 = 'test comment 2' |
| self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level2, |
| comment=comment2, |
| selected_skill=str(skill_cat.trove_cat_id), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| # Check that everything is as it was before |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Remove a skill |
| self.app.get('/auth/user_info/skills/') |
| self.app.post('/auth/user_info/skills/remove_skill', |
| params=dict( |
| categoryid=str(skill_cat.trove_cat_id), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.skills) == 0 |
| |
| @td.with_user_project('test-admin') |
| def test_user_message(self): |
| self.app.get('/').follow() # establish session |
| assert not M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| self.app.post('/auth/preferences/user_message', |
| params={'_session_id': self.app.cookies['_session_id'], |
| }) |
| assert M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| self.app.post('/auth/preferences/user_message', |
| params={'allow_user_messages': 'on', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| assert not M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| |
| @td.with_user_project('test-admin') |
| def test_additional_page(self): |
| class MyPP(plugin.UserPreferencesProvider): |
| def not_page(self): |
| return 'not page' |
| |
| @expose() |
| def new_page(self): |
| return 'new page' |
| |
| with mock.patch.object(plugin.UserPreferencesProvider, 'get') as upp_get: |
| upp_get.return_value = MyPP() |
| r = self.app.get('/auth/new_page') |
| assert_equal(r.body, 'new page') |
| self.app.get('/auth/not_page', status=404) |
| |
| |
| class TestPasswordReset(TestController): |
| test_primary_email = 'testprimaryaddr@mail.com' |
| |
| def setUp(self): |
| super(TestPasswordReset, self).setUp() |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| @patch('allura.tasks.mail_tasks.sendmail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_email_unconfirmed(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find( |
| {'claimed_by_user_id': user._id}).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is None |
| |
| @patch('allura.tasks.mail_tasks.sendmail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_disabled(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find( |
| {'claimed_by_user_id': user._id}).first() |
| user.disabled = True |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is None |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_only_primary_email_reset_allowed(self, gen_message_id, sendmail): |
| self.app.get('/').follow() # establish session |
| user = M.User.query.get(username='test-admin') |
| user.claim_address(self.test_primary_email) |
| user.set_pref('email_address', self.test_primary_email) |
| |
| email = M.EmailAddress.find({'email': self.test_primary_email}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| |
| with h.push_config(config, **{'auth.allow_non_primary_email_password_reset': 'false'}): |
| self.app.post('/auth/password_recovery_hash', {'email': self.test_primary_email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is not None |
| args, kwargs = sendmail.post.call_args |
| assert_equal(kwargs['toaddr'], self.test_primary_email) |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_non_primary_email_reset_allowed(self, gen_message_id, sendmail): |
| self.app.get('/').follow() # establish session |
| user = M.User.query.get(username='test-admin') |
| email1 = M.EmailAddress.find({'claimed_by_user_id': user._id}).first() |
| user.claim_address(self.test_primary_email) |
| user.set_pref('email_address', self.test_primary_email) |
| email = M.EmailAddress.find({'email': self.test_primary_email}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| with h.push_config(config, **{'auth.allow_non_primary_email_password_reset': 'true'}): |
| self.app.post('/auth/password_recovery_hash', {'email': email1.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is not None |
| args, kwargs = sendmail.post.call_args |
| assert_equal(kwargs['toaddr'], email1.email) |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_password_reset(self, gen_message_id, sendmail): |
| self.app.get('/').follow() # establish session |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find({'claimed_by_user_id': user._id}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| old_pw_hash = user.password |
| |
| # request a reset |
| with td.audits('Password recovery link sent to: ' + email.email, user=True): |
| r = self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| # confirm some fields |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert hash is not None |
| assert hash_expiry is not None |
| |
| # confirm email sent |
| text = '''Your username is test-admin |
| |
| To update your password on %s, please visit the following URL: |
| |
| %s/auth/forgotten_password/%s''' % (config['site_name'], config['base_url'], hash) |
| sendmail.post.assert_called_once_with( |
| sender='noreply@localhost', |
| toaddr=email.email, |
| fromaddr=u'"{}" <{}>'.format(config['site_name'], config['forgemail.return_path']), |
| reply_to=config['forgemail.return_path'], |
| subject='Allura Password recovery', |
| message_id=gen_message_id(), |
| text=text) |
| |
| # load reset form and fill it out |
| r = self.app.get('/auth/forgotten_password/%s' % hash) |
| assert_in('Enter a new password for: test-admin', r) |
| assert_in('New Password:', r) |
| assert_in('New Password (again):', r) |
| form = r.forms[0] |
| form['pw'] = form['pw2'] = new_password = '154321' |
| with td.audits('Password changed \(through recovery process\)', user=True): |
| # escape parentheses, so they would not be treated as regex group |
| r = form.submit() |
| |
| # confirm password changed and works |
| user = M.User.query.get(username='test-admin') |
| assert_not_equal(old_pw_hash, user.password) |
| provider = plugin.LocalAuthenticationProvider(None) |
| assert_true(provider._validate_password(user, new_password)) |
| |
| # confirm reset fields cleared |
| user = M.User.query.get(username='test-admin') |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert_equal(hash, '') |
| assert_equal(hash_expiry, '') |
| |
| # confirm can log in now in same session |
| r = r.follow() |
| assert 'Log Out' not in r, r |
| form = r.forms[0] |
| encoded = self.app.antispam_field_names(r.form) |
| form[encoded['username']] = 'test-admin' |
| form[encoded['password']] = new_password |
| r = form.submit(status=302) |
| r = r.follow().follow() |
| assert 'Log Out' in r, r |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_hash_expired(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find( |
| {'claimed_by_user_id': user._id}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| user = M.User.by_username('test-admin') |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| user.set_tool_data('AuthPasswordReset', |
| hash_expiry=datetime(2000, 10, 10)) |
| r = self.app.get('/auth/forgotten_password/%s' % hash.encode('utf-8')) |
| assert_in('Unable to process reset, please try again', r.follow().body) |
| r = self.app.post('/auth/set_new_password/%s' % |
| hash.encode('utf-8'), {'pw': '154321', 'pw2': '154321', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| assert_in('Unable to process reset, please try again', r.follow().body) |
| |
| @patch('allura.lib.plugin.AuthenticationProvider') |
| def test_provider_disabled(self, AP): |
| user = M.User.query.get(username='test-admin') |
| ap = AP.get() |
| ap.forgotten_password_process = False |
| ap.authenticate_request()._id = user._id |
| ap.by_username().username = user.username |
| self.app.get('/auth/forgotten_password', status=404) |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/set_new_password', |
| {'pw': 'foo', 'pw2': 'foo', '_session_id': self.app.cookies['_session_id']}, |
| status=404) |
| self.app.post('/auth/password_recovery_hash', |
| {'email': 'foo', '_session_id': self.app.cookies['_session_id']}, |
| status=404) |
| |
| @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)) |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_pwd_reset_hibp_check(self, gen_message_id, sendmail): |
| self.app.get('/').follow() # establish session |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find({'claimed_by_user_id': user._id}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| |
| # request a reset |
| r = self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| |
| # load reset form and fill it out with weak password |
| r = self.app.get('/auth/forgotten_password/%s' % hash) |
| form = r.forms[0] |
| form['pw'] = form['pw2'] = new_password = 'password' |
| r = form.submit() |
| assert 'Unsafe' in str(r.headers) |
| |
| # fill it out again, with a stronger password |
| r = r.follow() |
| form = r.forms[0] |
| form['pw'] = form['pw2'] = new_password = 'oj35h9u34280j924hnuiw' # something unlikely to trip at hibp |
| r = form.submit() |
| assert 'Unsafe' not in str(r.headers) |
| |
| # confirm password changed and works |
| user = M.User.query.get(username='test-admin') |
| provider = plugin.LocalAuthenticationProvider(None) |
| assert_true(provider._validate_password(user, new_password)) |
| |
| # confirm can log in now in same session |
| r = r.follow() |
| assert 'Log Out' not in r, r |
| form = r.forms[0] |
| encoded = self.app.antispam_field_names(r.form) |
| form[encoded['username']] = 'test-admin' |
| form[encoded['password']] = new_password |
| r = form.submit(status=302) |
| r = r.follow().follow() |
| assert 'Log Out' in r, r |
| |
| |
| class TestOAuth(TestController): |
| def test_register_deregister_app(self): |
| # register |
| r = self.app.get('/auth/oauth/') |
| r = self.app.post('/auth/oauth/register', |
| params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez', |
| '_session_id': self.app.cookies['_session_id'], |
| }).follow() |
| assert 'oautstapp' in r |
| # deregister |
| assert_equal(r.forms[0].action, 'deregister') |
| r.forms[0].submit() |
| r = self.app.get('/auth/oauth/') |
| assert 'oautstapp' not in r |
| |
| def test_generate_revoke_access_token(self): |
| # generate |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/oauth/register', |
| params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez', |
| '_session_id': self.app.cookies['_session_id'], |
| }).follow() |
| assert_equal(r.forms[1].action, 'generate_access_token') |
| r.forms[1].submit() |
| r = self.app.get('/auth/oauth/') |
| assert 'Bearer Token:' in r |
| assert_not_equal( |
| M.OAuthAccessToken.for_user(M.User.by_username('test-admin')), []) |
| # revoke |
| assert_equal(r.forms[0].action, 'revoke_access_token') |
| r.forms[0].submit() |
| r = self.app.get('/auth/oauth/') |
| assert_not_equal(r.forms[0].action, 'revoke_access_token') |
| assert_equal( |
| M.OAuthAccessToken.for_user(M.User.by_username('test-admin')), []) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_interactive(self, Request, Server): |
| M.OAuthConsumerToken.consumer = mock.Mock() |
| user = M.User.by_username('test-admin') |
| M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| ThreadLocalORMSession.flush_all() |
| Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_callback': 'http://my.domain.com/callback', |
| } |
| r = self.app.post('/rest/oauth/request_token', params={}) |
| rtok = parse_qs(r.body)['oauth_token'][0] |
| r = self.app.post('/rest/oauth/authorize', |
| params={'oauth_token': rtok}) |
| r = r.forms[0].submit('yes') |
| assert r.location.startswith('http://my.domain.com/callback') |
| pin = parse_qs(urlparse(r.location).query)['oauth_verifier'][0] |
| Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': rtok, |
| 'oauth_verifier': pin, |
| } |
| r = self.app.get('/rest/oauth/access_token') |
| atok = parse_qs(r.body) |
| assert_equal(len(atok['oauth_token']), 1) |
| assert_equal(len(atok['oauth_token_secret']), 1) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_request_token_valid(self, Request, Server): |
| M.OAuthConsumerToken.consumer = mock.Mock() |
| user = M.User.by_username('test-user') |
| consumer_token = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| req = Request.from_request.return_value = {'oauth_consumer_key': 'api_key'} |
| r = self.app.post('/rest/oauth/request_token', params={'key': 'value'}) |
| |
| # dict-ify webob.EnvironHeaders |
| call = Request.from_request.call_args_list[0] |
| call[1]['headers'] = dict(call[1]['headers']) |
| # then check equality |
| assert_equal(Request.from_request.call_args_list, [ |
| mock.call('POST', 'http://localhost/rest/oauth/request_token', |
| headers={'Host': 'localhost:80', |
| 'Content-Type': 'application/x-www-form-urlencoded', |
| 'Content-Length': '9'}, |
| parameters={'key': 'value'}, |
| query_string='') |
| ]) |
| Server().verify_request.assert_called_once_with(req, consumer_token.consumer, None) |
| request_token = M.OAuthRequestToken.query.get(consumer_token_id=consumer_token._id) |
| assert_is_not_none(request_token) |
| assert_equal(r.body, request_token.to_string()) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_request_token_no_consumer_token(self, Request, Server): |
| Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key'} |
| self.app.post('/rest/oauth/request_token', |
| params={'key': 'value'}, status=401) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_request_token_invalid(self, Request, Server): |
| Server().verify_request.side_effect = oauth2.Error('test_request_token_invalid') |
| M.OAuthConsumerToken.consumer = mock.Mock() |
| user = M.User.by_username('test-user') |
| M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| Request.from_request.return_value = {'oauth_consumer_key': 'api_key'} |
| self.app.post('/rest/oauth/request_token', params={'key': 'value'}, status=401) |
| |
| def test_authorize_ok(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/authorize', params={'oauth_token': 'api_key'}) |
| assert_in('ctok_desc', r.body) |
| assert_in('api_key', r.body) |
| |
| def test_authorize_invalid(self): |
| self.app.post('/rest/oauth/authorize', params={'oauth_token': 'api_key'}, status=401) |
| |
| def test_do_authorize_no(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.post('/rest/oauth/do_authorize', |
| params={'no': '1', 'oauth_token': 'api_key'}) |
| assert_is_none(M.OAuthRequestToken.query.get(api_key='api_key')) |
| |
| def test_do_authorize_oob(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'}) |
| assert_is_not_none(r.html.find(text=re.compile('^PIN: '))) |
| |
| def test_do_authorize_cb(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'}) |
| assert r.location.startswith('http://my.domain.com/callback?oauth_token=api_key&oauth_verifier=') |
| |
| def test_do_authorize_cb_params(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'}) |
| assert r.location.startswith('http://my.domain.com/callback?myparam=foo&oauth_token=api_key&oauth_verifier=') |
| |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_no_consumer(self, Request): |
| Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'good', |
| } |
| self.app.get('/rest/oauth/access_token', status=401) |
| |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_no_request(self, Request): |
| Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'good', |
| } |
| user = M.User.by_username('test-admin') |
| M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/rest/oauth/access_token', status=401) |
| |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_bad_pin(self, Request): |
| Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'bad', |
| } |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good', |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/rest/oauth/access_token', status=401) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_bad_sig(self, Request, Server): |
| Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'good', |
| } |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good', |
| ) |
| ThreadLocalORMSession.flush_all() |
| Server().verify_request.side_effect = oauth2.Error('test_access_token_bad_sig') |
| self.app.get('/rest/oauth/access_token', status=401) |
| |
| @mock.patch('allura.controllers.rest.oauth.Server') |
| @mock.patch('allura.controllers.rest.oauth.Request') |
| def test_access_token_ok(self, Request, Server): |
| Request.from_request.return_value = { |
| 'oauth_consumer_key': 'api_key', |
| 'oauth_token': 'api_key', |
| 'oauth_verifier': 'good', |
| } |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good', |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.get('/rest/oauth/access_token') |
| atok = parse_qs(r.body) |
| assert_equal(len(atok['oauth_token']), 1) |
| assert_equal(len(atok['oauth_token_secret']), 1) |
| |
| |
| class TestDisableAccount(TestController): |
| def test_not_authenticated(self): |
| r = self.app.get( |
| '/auth/disable/', |
| extra_environ={'username': '*anonymous'}) |
| assert_equal(r.status_int, 302) |
| assert_equal(r.location, |
| 'http://localhost/auth/?return_to=%2Fauth%2Fdisable%2F') |
| |
| def test_lists_user_projects(self): |
| r = self.app.get('/auth/disable/') |
| user = M.User.by_username('test-admin') |
| for p in user.my_projects_by_role_name('Admin'): |
| if p.name == 'u/test-admin': |
| continue |
| assert_in(p.name, r) |
| assert_in(p.url(), r) |
| |
| def test_has_asks_password(self): |
| r = self.app.get('/auth/disable/') |
| form = r.html.find('form', {'action': 'do_disable'}) |
| assert form is not None |
| |
| def test_bad_password(self): |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/disable/do_disable', {'password': 'bad', |
| '_session_id': self.app.cookies['_session_id'], }) |
| assert_in('Invalid password', r) |
| user = M.User.by_username('test-admin') |
| assert_equal(user.disabled, False) |
| |
| def test_disable(self): |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/disable/do_disable', {'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], }) |
| assert_equal(r.status_int, 302) |
| assert_equal(r.location, 'http://localhost/') |
| flash = json.loads(self.webflash(r)) |
| assert_equal(flash['status'], 'ok') |
| assert_equal(flash['message'], 'Your account was successfully disabled!') |
| user = M.User.by_username('test-admin') |
| assert_equal(user.disabled, True) |
| |
| |
| class TestPasswordExpire(TestController): |
| def login(self, username='test-user', pwd='foo', query_string=''): |
| extra = extra_environ={'username': '*anonymous', 'REMOTE_ADDR':'127.0.0.1'} |
| r = self.app.get('/auth/' + query_string, extra_environ=extra) |
| |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = username |
| f[encoded['password']] = pwd |
| return f.submit(extra_environ={'username': '*anonymous'}) |
| |
| def assert_redirects(self, where='/'): |
| resp = self.app.get(where, extra_environ={'username': 'test-user'}, status=302) |
| assert_equal(resp.location, 'http://localhost/auth/pwd_expired?' + urlencode({'return_to': where})) |
| |
| def assert_not_redirects(self, where='/neighborhood'): |
| self.app.get(where, extra_environ={'username': 'test-user'}, status=200) |
| |
| def test_disabled(self): |
| r = self.login() |
| assert_false(r.session.get('pwd-expired')) |
| self.assert_not_redirects() |
| |
| def expired(self, r): |
| return r.session.get('pwd-expired') |
| |
| def set_expire_for_user(self, username='test-user', days=100): |
| user = M.User.by_username(username) |
| user.last_password_updated = datetime.utcnow() - timedelta(days=days) |
| session(user).flush(user) |
| return user |
| |
| def test_days(self): |
| self.set_expire_for_user() |
| |
| with h.push_config(config, **{'auth.pwdexpire.days': 180}): |
| r = self.login() |
| assert_false(self.expired(r)) |
| self.assert_not_redirects() |
| |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| |
| def test_before(self): |
| self.set_expire_for_user() |
| |
| before = datetime.utcnow() - timedelta(days=180) |
| before = calendar.timegm(before.timetuple()) |
| with h.push_config(config, **{'auth.pwdexpire.before': before}): |
| r = self.login() |
| assert_false(self.expired(r)) |
| self.assert_not_redirects() |
| |
| before = datetime.utcnow() - timedelta(days=90) |
| before = calendar.timegm(before.timetuple()) |
| with h.push_config(config, **{'auth.pwdexpire.before': before}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| |
| def test_logout(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| r = self.app.get('/auth/logout', extra_environ={'username': 'test-user'}) |
| assert_false(self.expired(r)) |
| self.assert_not_redirects() |
| |
| def test_change_pwd(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| |
| user = M.User.by_username('test-user') |
| old_update_time = user.last_password_updated |
| old_password = user.password |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = 'foo' |
| f['pw'] = 'qwerty' |
| f['pw2'] = 'qwerty' |
| r = f.submit(extra_environ={'username': 'test-user'}, status=302) |
| assert_equal(r.location, 'http://localhost/') |
| assert_false(self.expired(r)) |
| user = M.User.by_username('test-user') |
| assert_true(user.last_password_updated > old_update_time) |
| assert_not_equal(user.password, old_password) |
| |
| # Can log in with new password and change isn't required anymore |
| r = self.login(pwd='qwerty').follow() |
| assert_equal(r.location, 'http://localhost/dashboard') |
| assert_not_in('Invalid login', r) |
| assert_false(self.expired(r)) |
| self.assert_not_redirects() |
| |
| # and can't log in with old password |
| r = self.login(pwd='foo') |
| assert_in('Invalid login', r) |
| |
| def test_expired_pwd_change_invalidates_token(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| user = M.User.by_username('test-user') |
| user.set_tool_data('AuthPasswordReset', |
| hash="generated_hash_value", |
| hash_expiry="04-08-2020") |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert_equal(hash, 'generated_hash_value') |
| assert_equal(hash_expiry, '04-08-2020') |
| session(user).flush(user) |
| |
| # Change expired password |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = 'foo' |
| f['pw'] = 'qwerty' |
| f['pw2'] = 'qwerty' |
| r = f.submit(extra_environ={'username': 'test-user'}, status=302) |
| assert_equal(r.location, 'http://localhost/') |
| |
| user = M.User.by_username('test-user') |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| |
| assert_equal(hash, '') |
| assert_equal(hash_expiry, '') |
| |
| def check_validation(self, oldpw, pw, pw2): |
| user = M.User.by_username('test-user') |
| old_update_time = user.last_password_updated |
| old_password = user.password |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = oldpw |
| f['pw'] = pw |
| f['pw2'] = pw2 |
| r = f.submit(extra_environ={'username': 'test-user'}) |
| assert_true(self.expired(r)) |
| user = M.User.by_username('test-user') |
| assert_equal(user.last_password_updated, old_update_time) |
| assert_equal(user.password, old_password) |
| return r |
| |
| def test_change_pwd_validation(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert_true(self.expired(r)) |
| self.assert_redirects() |
| |
| r = self.check_validation('', '', '') |
| assert_in('Please enter a value', r) |
| r = self.check_validation('', 'qwe', 'qwerty') |
| assert_in('Enter a value 6 characters long or more', r) |
| r = self.check_validation('bad', 'qwerty1', 'qwerty') |
| assert_in('Passwords must match', r) |
| r = self.check_validation('bad', 'qwerty', 'qwerty') |
| assert_in('Incorrect password', self.webflash(r)) |
| assert_equal(r.location, 'http://localhost/auth/pwd_expired?return_to=') |
| |
| with h.push_config(config, **{'auth.min_password_len': 3}): |
| r = self.check_validation('foo', 'foo', 'foo') |
| assert_in('Your old and new password should not be the same', r) |
| |
| def test_return_to(self): |
| return_to = '/p/test/tickets/?milestone=1.0&page=2' |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login(query_string='?' + urlencode({'return_to': return_to})) |
| # don't go to the return_to yet |
| assert_equal(r.location, 'http://localhost/auth/pwd_expired?' + urlencode({'return_to': return_to})) |
| |
| # but if user tries to go directly there anyway, intercept and redirect back |
| self.assert_redirects(where=return_to) |
| |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = 'foo' |
| f['pw'] = 'qwerty' |
| f['pw2'] = 'qwerty' |
| f['return_to'] = return_to |
| r = f.submit(extra_environ={'username': 'test-user'}, status=302) |
| assert_equal(r.location, 'http://localhost/p/test/tickets/?milestone=1.0&page=2') |
| |
| |
| class TestCSRFProtection(TestController): |
| def test_blocks_invalid(self): |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True', 'REMOTE_ADDR': '127.0.0.1'} |
| |
| # regular login |
| r = self.app.get('/auth/') |
| |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-admin', password='foo', |
| _session_id=self.app.cookies['_session_id']), |
| antispam=True) |
| |
| # regular form submit |
| r = self.app.get('/admin/overview') |
| r = r.form.submit() |
| assert_equal(r.location, 'http://localhost/admin/overview') |
| |
| # invalid form submit |
| r = self.app.get('/admin/overview') |
| r.form['_session_id'] = 'bogus' |
| r = r.form.submit() |
| assert_equal(r.location, 'http://localhost/auth/') |
| |
| def test_blocks_invalid_on_login(self): |
| r = self.app.get('/auth/') |
| r.form['_session_id'] = 'bogus' |
| r.form.submit(status=403) |
| |
| def test_token_present_on_first_request(self): |
| r = self.app.get('/auth/') |
| assert_true(r.form['_session_id'].value) |
| |
| |
| class TestTwoFactor(TestController): |
| |
| sample_key = b'\x00K\xda\xbfv\xc2B\xaa\x1a\xbe\xa5\x96b\xb2\xa0Z:\xc9\xcf\x8a' |
| |
| def _init_totp(self, username='test-admin'): |
| user = M.User.query.get(username=username) |
| totp_srv = TotpService().get() |
| totp_srv.set_secret_key(user, self.sample_key) |
| user.set_pref('multifactor', True) |
| |
| def test_settings_on(self): |
| r = self.app.get('/auth/preferences/') |
| assert r.html.find(attrs={'class': 'preferences multifactor'}) |
| |
| def test_settings_off(self): |
| with h.push_config(config, **{'auth.multifactor.totp': 'false'}): |
| r = self.app.get('/auth/preferences/') |
| assert not r.html.find(attrs={'class': 'preferences multifactor'}) |
| |
| for url in ['/auth/preferences/totp_new', |
| '/auth/preferences/totp_view', |
| '/auth/preferences/totp_set', |
| '/auth/preferences/totp_send_link', |
| '/auth/preferences/multifactor_disable', |
| '/auth/preferences/multifactor_recovery', |
| '/auth/preferences/multifactor_recovery_regen', |
| '/auth/multifactor', |
| '/auth/do_multifactor', |
| ]: |
| self.app.post(url, |
| {'password': 'foo', '_session_id': self.app.cookies['_session_id']}, |
| status=404) |
| |
| def test_user_disabled(self): |
| r = self.app.get('/auth/preferences/') |
| info_html = str(r.html.find(attrs={'class': 'preferences multifactor'})) |
| assert_in('disabled', info_html) |
| |
| def test_user_enabled(self): |
| self._init_totp() |
| r = self.app.get('/auth/preferences/') |
| info_html = str(r.html.find(attrs={'class': 'preferences multifactor'})) |
| assert_in('enabled', info_html) |
| |
| def test_reconfirm_auth(self): |
| from datetime import datetime as real_datetime |
| with patch('allura.lib.decorators.datetime') as datetime: |
| datetime.min = real_datetime.min |
| |
| # reconfirm required at first |
| datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 0, 0) |
| r = self.app.get('/auth/preferences/totp_new') |
| assert_in('Password Confirmation', r) |
| |
| # submit form, and its not required |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| assert_not_in('Password Confirmation', r) |
| |
| # still not required |
| datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 1, 45) |
| r = self.app.get('/auth/preferences/totp_new') |
| assert_not_in('Password Confirmation', r) |
| |
| # required later |
| datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 2, 3) |
| r = self.app.get('/auth/preferences/totp_new') |
| assert_in('Password Confirmation', r) |
| |
| def test_enable_totp(self): |
| # create a separate session, for later use in the test |
| other_session = TestController() |
| other_session.setUp() |
| other_session.app.get('/auth/preferences/') |
| |
| with out_audits(user=True): |
| r = self.app.get('/auth/preferences/totp_new') |
| assert_in('Password Confirmation', r) |
| |
| with audits('Visited multifactor new TOTP page', user=True): |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| assert_in('Scan this barcode', r) |
| |
| first_key_shown = r.session['totp_new_key'] |
| |
| with audits('Failed to set up multifactor TOTP \(wrong code\)', user=True): |
| form = r.forms['totp_set'] |
| form['code'] = '' |
| r = form.submit() |
| assert_in('Invalid', r) |
| assert_equal(first_key_shown, r.session['totp_new_key']) # different keys on each pageload would be bad! |
| |
| new_totp = TotpService().Totp(r.session['totp_new_key']) |
| code = new_totp.generate(time_time()) |
| form = r.forms['totp_set'] |
| form['code'] = code |
| with audits('Set up multifactor TOTP', user=True): |
| r = form.submit() |
| assert_equal('Two factor authentication has now been set up.', json.loads(self.webflash(r))['message'], |
| self.webflash(r)) |
| |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert_equal(len(tasks), 1) |
| assert_equal(tasks[0].kwargs['subject'], 'Two-Factor Authentication Enabled') |
| assert_in('new two-factor authentication', tasks[0].kwargs['text']) |
| |
| r = r.follow() |
| assert_in('Recovery Codes', r) |
| |
| # Confirm any pre-existing sessions have to re-authenticate |
| r = other_session.app.get('/auth/preferences/') |
| assert_in('/auth/?return_to', r.headers['Location']) |
| other_session.tearDown() |
| |
| def test_reset_totp(self): |
| self._init_totp() |
| |
| # access page |
| r = self.app.get('/auth/preferences/totp_new') |
| assert_in('Password Confirmation', r) |
| |
| # reconfirm password to get to it |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| |
| # confirm warning message, and key is not changed yet |
| assert_in('Scan this barcode', r) |
| assert_in('this will invalidate your previous', r) |
| current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin')) |
| assert_equal(self.sample_key, current_key) |
| |
| # incorrect submission |
| form = r.forms['totp_set'] |
| form['code'] = '' |
| r = form.submit() |
| assert_in('Invalid', r) |
| |
| # still unchanged key |
| current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin')) |
| assert_equal(self.sample_key, current_key) |
| |
| # valid submission |
| new_key = r.session['totp_new_key'] |
| new_totp = TotpService().Totp(new_key) |
| code = new_totp.generate(time_time()) |
| form = r.forms['totp_set'] |
| form['code'] = code |
| r = form.submit() |
| assert_equal('Two factor authentication has now been set up.', json.loads(self.webflash(r))['message'], |
| self.webflash(r)) |
| |
| # new key in place |
| current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin')) |
| assert_equal(new_key, current_key) |
| assert_not_equal(self.sample_key, current_key) |
| |
| def test_disable(self): |
| self._init_totp() |
| |
| self.app.get('/auth/preferences/multifactor_disable', status=405) # GET not allowed |
| |
| # get form and submit |
| r = self.app.get('/auth/preferences/') |
| form = r.forms['multifactor_disable'] |
| r = form.submit() |
| |
| # confirm first, no change |
| assert_in('Password Confirmation', r) |
| user = M.User.query.get(username='test-admin') |
| assert_equal(user.get_pref('multifactor'), True) |
| |
| # confirm submit, everything goes off |
| r.form['password'] = 'foo' |
| with audits('Disabled multifactor TOTP', user=True): |
| r = r.form.submit() |
| assert_equal('Multifactor authentication has now been disabled.', json.loads(self.webflash(r))['message'], |
| self.webflash(r)) |
| user = M.User.query.get(username='test-admin') |
| assert_equal(user.get_pref('multifactor'), False) |
| assert_equal(TotpService().get().get_secret_key(user), None) |
| assert_equal(RecoveryCodeService().get().get_codes(user), []) |
| |
| # email confirmation |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert_equal(len(tasks), 1) |
| assert_equal(tasks[0].kwargs['subject'], 'Two-Factor Authentication Disabled') |
| assert_in('disabled two-factor authentication', tasks[0].kwargs['text']) |
| |
| def test_login_totp(self): |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/?return_to=/p/foo') |
| encoded = self.app.antispam_field_names(r.form) |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| with audits('Multifactor login - password ok, code not entered yet', user=True): |
| r = r.form.submit() |
| |
| # check results |
| assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r |
| r = r.follow() |
| assert not r.session.get('username') |
| |
| # try an invalid code |
| r.form['code'] = 'invalid-code' |
| with audits('Multifactor login - invalid code', user=True): |
| r = r.form.submit() |
| assert_in('Invalid code', r) |
| assert not r.session.get('username') |
| |
| # use a valid code |
| totp = TotpService().Totp(self.sample_key) |
| code = totp.generate(time_time()) |
| r.form['code'] = code |
| with audits('Successful login', user=True): |
| r = r.form.submit() |
| |
| # confirm login and final page |
| assert_equal(r.session['username'], 'test-admin') |
| assert r.location.endswith('/p/foo'), r |
| |
| def test_login_rate_limit(self): |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/?return_to=/p/foo') |
| encoded = self.app.antispam_field_names(r.form) |
| |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| r = r.form.submit() |
| r = r.follow() |
| |
| # try some invalid codes |
| for i in xrange(3): |
| r.form['code'] = 'invalid-code' |
| r = r.form.submit() |
| assert_in('Invalid code', r) |
| |
| # use a valid code, but it'll hit rate limit |
| totp = TotpService().Totp(self.sample_key) |
| code = totp.generate(time_time()) |
| r.form['code'] = code |
| with audits('Multifactor login - rate limit', user=True): |
| r = r.form.submit() |
| |
| assert_in('rate limit exceeded', r) |
| assert not r.session.get('username') |
| |
| def test_login_totp_disrupted(self): |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/') |
| encoded = self.app.antispam_field_names(r.form) |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| r = r.form.submit() |
| r = r.follow() |
| |
| # go to some other page instead of filling out the 2FA code |
| other_r = self.app.get('/') |
| |
| # then try to complete the 2FA form |
| totp = TotpService().Totp(self.sample_key) |
| code = totp.generate(time_time()) |
| r.form['code'] = code |
| r = r.form.submit() |
| |
| # sent back to regular login |
| assert_equal('Your multifactor login was disrupted, please start over.', json.loads(self.webflash(r))['message'], |
| self.webflash(r)) |
| r = r.follow() |
| assert_in('Password Login', r) |
| |
| def test_login_recovery_code(self): |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/?return_to=/p/foo') |
| encoded = self.app.antispam_field_names(r.form) |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| r = r.form.submit() |
| |
| # check results |
| assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r |
| r = r.follow() |
| assert not r.session.get('username') |
| |
| # change login mode |
| r.form['mode'] = 'recovery' |
| |
| # try an invalid code |
| r.form['code'] = 'invalid-code' |
| r = r.form.submit() |
| assert_in('Invalid code', r) |
| assert not r.session.get('username') |
| |
| # use a valid code |
| user = M.User.by_username('test-admin') |
| recovery = RecoveryCodeService().get() |
| recovery.regenerate_codes(user) |
| recovery_code = recovery.get_codes(user)[0] |
| r.form['code'] = recovery_code |
| with audits('Logged in using a multifactor recovery code', user=True): |
| r = r.form.submit() |
| |
| # confirm login and final page |
| assert_equal(r.session['username'], 'test-admin') |
| assert r.location.endswith('/p/foo'), r |
| |
| # confirm code used up |
| assert_not_in(recovery_code, RecoveryCodeService().get().get_codes(user)) |
| |
| @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)) |
| def test_login_totp_with_hibp(self): |
| # this is essentially the same as regular TOTP test, just making sure that HIBP doesn't get in the way |
| # or cause any problems. It shouldn't even run since a password isn't present when the final login happens |
| |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/?return_to=/p/foo') |
| encoded = self.app.antispam_field_names(r.form) |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| with audits('Multifactor login - password ok, code not entered yet', user=True): |
| r = r.form.submit() |
| |
| # check results |
| assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r |
| r = r.follow() |
| assert not r.session.get('username') |
| |
| # use a valid code |
| totp = TotpService().Totp(self.sample_key) |
| code = totp.generate(time_time()) |
| r.form['code'] = code |
| with audits('Successful login', user=True): |
| r = r.form.submit() |
| |
| # confirm login and final page |
| assert_equal(r.session['username'], 'test-admin') |
| assert r.location.endswith('/p/foo'), r |
| |
| def test_view_key(self): |
| self._init_totp() |
| |
| with out_audits(user=True): |
| r = self.app.get('/auth/preferences/totp_view') |
| assert_in('Password Confirmation', r) |
| |
| with audits('Viewed multifactor TOTP config page', user=True): |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| assert_in('Scan this barcode', r) |
| |
| def test_view_recovery_codes_and_regen(self): |
| self._init_totp() |
| |
| # reconfirm password |
| with out_audits(user=True): |
| r = self.app.get('/auth/preferences/multifactor_recovery') |
| assert_in('Password Confirmation', r) |
| |
| # actual visit |
| with audits('Viewed multifactor recovery codes', user=True): |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| assert_in('Download', r) |
| assert_in('Print', r) |
| |
| # regenerate codes |
| with audits('Regenerated multifactor recovery codes', user=True): |
| r = r.forms['multifactor_recovery_regen'].submit() |
| |
| # email confirmation |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert_equal(len(tasks), 1) |
| assert_equal(tasks[0].kwargs['subject'], 'Two-Factor Recovery Codes Regenerated') |
| assert_in('regenerated', tasks[0].kwargs['text']) |
| |
| def test_send_links(self): |
| r = self.app.get('/auth/preferences/totp_new') |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| |
| r = r.forms['totp_send_link'].submit() |
| |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert_equal(len(tasks), 1) |
| assert_equal(tasks[0].kwargs['subject'], 'Two-Factor Authentication Apps') |
| assert_in('itunes.apple.com', tasks[0].kwargs['text']) |
| assert_in('play.google.com', tasks[0].kwargs['text']) |