| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import calendar |
| from base64 import b32encode |
| from datetime import datetime, time, timedelta |
| from time import time as time_time |
| import json |
| |
| from six.moves.urllib.parse import urlparse, parse_qs |
| from six.moves.urllib.parse import urlencode |
| |
| from bson import ObjectId |
| import re |
| |
| from ming.orm.ormsession import ThreadLocalORMSession, session |
| from tg import config, expose |
| from mock import patch, Mock |
| import mock |
| from alluratest.tools import ( |
| assert_equal, |
| assert_not_equal, |
| assert_is_none, |
| assert_is_not_none, |
| assert_in, |
| assert_not_in, |
| assert_true, |
| assert_false, |
| ) |
| from tg import tmpl_context as c, app_globals as g |
| |
| from allura.tests import TestController |
| from allura.tests import decorators as td |
| from allura.tests.decorators import audits, out_audits, assert_logmsg |
| from allura.tests.pytest_helpers import with_nose_compatibility |
| from alluratest.controller import setup_trove_categories, TestRestApiBase, oauth1_webtest |
| from allura import model as M |
| from allura.model.oauth import dummy_oauths |
| from allura.lib import plugin |
| from allura.lib import helpers as h |
| from allura.lib.multifactor import TotpService, RecoveryCodeService |
| |
| |
| def unentity(s): |
| return s.replace('"', '"').replace('"', '"') |
| |
| |
| @with_nose_compatibility |
| class TestAuth(TestController): |
| def test_login(self): |
| self.app.get('/auth/') |
| r = self.app.post('/auth/send_verification_link', params=dict(a='test@example.com', |
| _session_id=self.app.cookies['_session_id'])) |
| email = M.User.query.get(username='test-admin').email_addresses[0] |
| r = self.app.post('/auth/send_verification_link', params=dict(a=email, |
| _session_id=self.app.cookies['_session_id'])) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.get('/auth/verify_addr', params=dict(a='foo')) |
| assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r) |
| ea = M.EmailAddress.find({'email': email}).first() |
| r = self.app.get('/auth/verify_addr', params=dict(a=ea.nonce)) |
| assert json.loads(self.webflash(r))['status'] == 'ok', self.webflash(r) |
| r = self.app.get('/auth/logout') |
| |
| with audits('Successful login', user=True): |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| _session_id=self.app.cookies['_session_id']), |
| antispam=True).follow() |
| assert r.headers['Location'] == 'http://localhost/dashboard' |
| |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='foo', honey1='robot', # bad honeypot value |
| _session_id=self.app.cookies['_session_id']), |
| extra_environ={'regular_antispam_err_handling_even_when_tests': 'true'}, |
| status=302) |
| wf = json.loads(self.webflash(r)) |
| assert wf['status'] == 'error' |
| assert wf['message'] == 'Spambot protection engaged' |
| |
| with audits('Failed login', user=True): |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='food', |
| _session_id=self.app.cookies['_session_id'])) |
| assert 'Invalid login' in str(r), r.showbrowser() |
| |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-usera', password='foo', |
| _session_id=self.app.cookies['_session_id'])) |
| assert 'Invalid login' in str(r), r.showbrowser() |
| |
| def test_login_invalid_username(self): |
| extra = {'username': '*anonymous'} |
| r = self.app.get('/auth/', extra_environ=extra) |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test@user.com' |
| f[encoded['password']] = 'foo' |
| r = f.submit(extra_environ={'username': '*anonymous'}) |
| r.mustcontain('Usernames only include small letters, ') |
| |
| def test_login_diff_ips_ok(self): |
| # exercises AntiSpam.validate methods |
| extra = {'username': '*anonymous', 'REMOTE_ADDR': '11.22.33.44'} |
| r = self.app.get('/auth/', extra_environ=extra) |
| |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| with audits('Successful login', user=True): |
| r = f.submit(extra_environ={'username': '*anonymous', 'REMOTE_ADDR': '11.22.33.99'}) |
| |
| def test_login_diff_ips_bad(self): |
| # exercises AntiSpam.validate methods |
| extra = {'username': '*anonymous', 'REMOTE_ADDR': '24.52.32.123'} |
| r = self.app.get('/auth/', extra_environ=extra) |
| |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| r = f.submit(extra_environ={'username': '*anonymous', 'REMOTE_ADDR': '11.22.33.99', |
| 'regular_antispam_err_handling_even_when_tests': 'true'}, |
| status=302) |
| wf = json.loads(self.webflash(r)) |
| assert wf['status'] == 'error' |
| assert wf['message'] == 'Spambot protection engaged' |
| |
| @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)) |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| def test_login_hibp_compromised_password_untrusted_client(self, sendsimplemail): |
| # first & only login by this user, so won't have any trusted previous logins |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| r = self.app.get('/auth/') |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| |
| with audits('Attempted login from untrusted location with password in HIBP breach database', user=True): |
| r = f.submit(status=200) |
| |
| r.mustcontain('reset your password via email.') |
| r.mustcontain('reset your password via email.<br>\nPlease check your email') |
| |
| args, kwargs = sendsimplemail.post.call_args |
| assert sendsimplemail.post.call_count == 1 |
| assert kwargs['subject'] == 'Update your %s password' % config['site_name'] |
| assert '/auth/forgotten_password/' in kwargs['text'] |
| |
| assert [] == M.UserLoginDetails.query.find().all() # no records created |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| def test_login_hibp_compromised_password_trusted_client(self, sendsimplemail): |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login first, so IP address will be recorded and then trusted |
| r = self.app.get('/auth/') |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| with audits('Successful login', user=True): |
| f.submit(status=302) |
| self.app.get('/auth/logout') |
| |
| # this login will get caught by HIBP check, but trusted due to IP address being same |
| with patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)): |
| r = self.app.get('/auth/') |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| |
| with audits(r'Successful login with password in HIBP breach database, from trusted source ' |
| r'\(reason: exact ip\)', user=True): |
| r = f.submit(status=302) |
| |
| assert r.session.get('pwd-expired') |
| assert r.session.get('expired-reason') == 'hibp' |
| assert r.location == 'http://localhost/auth/pwd_expired' |
| |
| r = r.follow() |
| r.mustcontain('must be updated to be more secure') |
| |
| # changing password covered in TestPasswordExpire |
| |
| def test_login_disabled(self): |
| u = M.User.query.get(username='test-user') |
| u.disabled = True |
| r = self.app.get('/auth/', extra_environ={'username': '*anonymous'}) |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| with audits('Failed login', user=True): |
| r = f.submit(extra_environ={'username': '*anonymous'}) |
| |
| def test_login_pending(self): |
| u = M.User.query.get(username='test-user') |
| u.pending = True |
| r = self.app.get('/auth/', extra_environ={'username': '*anonymous'}) |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| with audits('Failed login', user=True): |
| r = f.submit(extra_environ={'username': '*anonymous'}) |
| |
| def test_login_overlay(self): |
| r = self.app.get('/auth/login_fragment/', extra_environ={'username': '*anonymous'}) |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = 'test-user' |
| f[encoded['password']] = 'foo' |
| with audits('Successful login', user=True): |
| r = f.submit(extra_environ={'username': '*anonymous'}) |
| |
| def test_logout(self): |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| nav_pattern = ('nav', {'class': 'nav-main'}) |
| r = self.app.get('/auth/') |
| |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| _session_id=self.app.cookies['_session_id']), |
| extra_environ={'REMOTE_ADDR': '127.0.0.1'}, |
| antispam=True).follow().follow() |
| |
| logged_in_session = r.session['_id'] |
| links = r.html.find(*nav_pattern).findAll('a') |
| assert links[-1].string == "Log Out" |
| |
| r = self.app.get('/auth/logout').follow().follow() |
| logged_out_session = r.session['_id'] |
| assert logged_in_session is not logged_out_session |
| links = r.html.find(*nav_pattern).findAll('a') |
| assert links[-1].string == 'Log In' |
| |
| def test_track_login(self): |
| user = M.User.by_username('test-user') |
| assert user.last_access['login_date'] == None |
| assert user.last_access['login_ip'] == None |
| assert user.last_access['login_ua'] == None |
| |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/do_login', |
| headers={'User-Agent': 'browser'}, |
| extra_environ={'REMOTE_ADDR': '127.0.0.1'}, |
| params=dict( |
| username='test-user', |
| password='foo', |
| _session_id=self.app.cookies['_session_id'], |
| ), |
| antispam=True, |
| ) |
| user = M.User.by_username('test-user') |
| assert user.last_access['login_date'] != None |
| assert user.last_access['login_ip'] == '127.0.0.1' |
| assert user.last_access['login_ua'] == 'browser' |
| |
| def test_rememberme(self): |
| username = M.User.query.get(username='test-user').username |
| |
| r = self.app.get('/').follow() # establish session |
| |
| # Login as test-user with remember me checkbox off |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| _session_id=self.app.cookies['_session_id'], |
| ), antispam=True) |
| assert r.session['username'] == username |
| assert r.session['login_expires'] == True |
| |
| for header, contents in r.headerlist: |
| if header == 'Set-cookie': |
| assert 'expires' not in contents |
| |
| # Login as test-user with remember me checkbox on |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', rememberme='on', |
| _session_id=self.app.cookies['_session_id'], |
| ), antispam=True) |
| assert r.session['username'] == username |
| assert r.session['login_expires'] != True |
| |
| for header, contents in r.headerlist: |
| if header == 'Set-cookie': |
| assert 'expires' in contents |
| |
| @td.with_user_project('test-admin') |
| def test_user_can_not_claim_duplicate_emails(self): |
| email_address = 'test_abcd_123@domain.net' |
| user = M.User.query.get(username='test-admin') |
| addresses_number = len(user.email_addresses) |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).count() == 1 |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r) |
| assert M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).count() == 1 |
| assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_added_claimed_address_by_other_user_confirmed(self, gen_message_id, sendsimplemail): |
| self.app.get('/').follow() # establish session |
| email_address = 'test_abcd_123@domain.net' |
| |
| # test-user claimed & confirmed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address)).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| |
| # Claiming the same email address by test-admin |
| # the email should be added to the email_addresses list but notifications should not be sent |
| |
| admin = M.User.query.get(username='test-admin') |
| addresses_number = len(admin.email_addresses) |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'A verification email has been sent. ' \ |
| 'Please check your email and click to confirm.' |
| |
| args, kwargs = sendsimplemail.post.call_args |
| assert sendsimplemail.post.call_count == 1 |
| assert kwargs['toaddr'] == email_address |
| assert kwargs['subject'] == '%s - Email address claim attempt' % config['site_name'] |
| assert "You tried to add %s to your Allura account, " \ |
| "but it is already claimed by your %s account." % (email_address, user.username) in kwargs['text'] |
| |
| assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1 |
| assert len(M.EmailAddress.find(dict(email=email_address)).all()) == 2 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_added_claimed_address_by_other_user_not_confirmed(self, gen_message_id, sendsimplemail): |
| email_address = 'test_abcd_1235@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address)).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| # Claiming the same email address by test-admin |
| # the email should be added to the email_addresses list but notifications should not be sent |
| |
| user1 = M.User.query.get(username='test-user-1') |
| addresses_number = len(user1.email_addresses) |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': email_address, |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-user-1')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'A verification email has been sent. ' \ |
| 'Please check your email and click to confirm.' |
| assert sendsimplemail.post.called |
| assert len(M.User.query.get(username='test-user-1').email_addresses) == addresses_number + 1 |
| assert len(M.EmailAddress.find(dict(email=email_address)).all()) == 2 |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_cannot_claim_more_than_max_limit(self, gen_message_id, sendsimplemail): |
| with h.push_config(config, **{'user_prefs.maximum_claimed_emails': '2'}): |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': 'test_abcd_1@domain.net', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-user-1')) |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params={ |
| 'new_addr.addr': 'test_abcd_2@domain.net', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-user-1@users.localhost', |
| 'preferences.email_format': 'plain', |
| 'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-user-1')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error' |
| assert json.loads(self.webflash(r))['message'] == 'You cannot claim more than 2 email addresses.' |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_verification_link_for_confirmed_email(self, gen_message_id, sendsimplemail): |
| self.app.get('/').follow() # establish session |
| email_address = 'test_abcd@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| email.confirmed = True |
| |
| user1 = M.User.query.get(username='test-user-1') |
| user1.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user1._id)).first() |
| email.confirmed = False |
| |
| ThreadLocalORMSession.flush_all() |
| |
| r = self.app.post('/auth/send_verification_link', |
| params=dict(a=email_address, _session_id=self.app.cookies['_session_id']), |
| extra_environ=dict(username='test-user-1', _session_id=self.app.cookies['_session_id'])) |
| |
| assert json.loads(self.webflash(r))['status'] == 'ok' |
| assert json.loads(self.webflash(r))['message'] == 'Verification link sent' |
| |
| args, kwargs = sendsimplemail.post.call_args |
| assert sendsimplemail.post.call_count == 1 |
| assert kwargs['toaddr'] == email_address |
| assert kwargs['subject'] == '%s - Email address claim attempt' % config['site_name'] |
| assert "You tried to add %s to your Allura account, " \ |
| "but it is already claimed by your %s account." % (email_address, user.username) in kwargs['text'] |
| |
| def test_invalidate_verification_link_if_email_was_confirmed(self): |
| self.app.get('/').follow() # establish session |
| email_address = 'test_abcd@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| |
| self.app.post('/auth/send_verification_link', |
| params=dict(a=email_address, |
| _session_id=self.app.cookies['_session_id']), |
| extra_environ=dict(username='test-user')) |
| |
| user1 = M.User.query.get(username='test-user-1') |
| user1.claim_address(email_address) |
| email1 = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user1._id)).first() |
| email1.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| # Verify first email with the verification link |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), |
| extra_environ=dict(username='test-user')) |
| |
| assert json.loads(self.webflash(r))['status'] == 'error' |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| assert not email.confirmed |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_verify_addr_correct_session(self, gen_message_id, sendsimplemail): |
| self.app.get('/').follow() # establish session |
| email_address = 'test_abcd@domain.net' |
| |
| # test-user claimed email address |
| user = M.User.query.get(username='test-user') |
| user.claim_address(email_address) |
| email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| |
| self.app.post('/auth/send_verification_link', |
| params=dict(a=email_address, |
| _session_id=self.app.cookies['_session_id']), |
| extra_environ=dict(username='test-user')) |
| |
| # logged out, gets redirected to login page |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), |
| extra_environ=dict(username='*anonymous')) |
| assert '/auth/?return_to=%2Fauth%2Fverify_addr' in r.location |
| |
| # logged in as someone else |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), |
| extra_environ=dict(username='test-admin')) |
| assert '/auth/?return_to=%2Fauth%2Fverify_addr' in r.location |
| assert 'You must be logged in to the correct account' == json.loads(self.webflash(r))['message'] |
| assert 'warning' == json.loads(self.webflash(r))['status'] |
| |
| # logged in as correct user |
| r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce), |
| extra_environ=dict(username='test-user')) |
| assert 'confirmed' in json.loads(self.webflash(r))['message'] |
| assert 'ok' == json.loads(self.webflash(r))['status'] |
| |
| # assert 'email added' notification email sent |
| args, kwargs = sendsimplemail.post.call_args |
| assert kwargs['toaddr'] == user._id |
| assert kwargs['subject'] == 'New Email Address Added' |
| |
| @staticmethod |
| def _create_password_reset_hash(): |
| """ Generates a password reset token for a given user. |
| |
| :return: User object |
| :rtype: User |
| """ |
| # test-user claimed email address |
| user = M.User.by_username('test-admin') |
| user.set_tool_data('AuthPasswordReset', |
| hash="generated_hash_value", |
| hash_expiry="04-08-2020") |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| session(user).flush(user) |
| |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert hash == 'generated_hash_value' |
| assert hash_expiry == '04-08-2020' |
| return user |
| |
| def test_token_generator(self): |
| """ Generates new token invalidation tests. |
| |
| The tests cover: changing, claiming, updating, removing email addresses. |
| :returns: email_change_invalidates_token |
| """ |
| _params = [{'new_addr.addr': 'test_abcd@domain.net', # Change primary address |
| 'primary_addr': 'test@example.com', }, |
| {'new_addr.addr': 'test@example.com', # Claim new address |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain'}, |
| {'addr-1.ord': '1', # remove test-admin@users.localhost |
| 'addr-1.delete': 'on', |
| 'addr-2.ord': '2', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain'}, |
| {'addr-1.ord': '1', # Remove email |
| 'addr-2.ord': '2', |
| 'addr-2.delete': 'on', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost'}] |
| |
| for param in _params: |
| yield self.email_change_invalidates_token, param |
| |
| def email_change_invalidates_token(self, change_params): |
| user = self._create_password_reset_hash() |
| session(user).flush(user) |
| |
| self.app.get('/').follow() # establish session |
| change_params['_session_id'] = self.app.cookies['_session_id'] |
| self.app.post('/auth/preferences/update_emails', |
| extra_environ=dict(username='test-admin'), |
| params=change_params) |
| |
| u = M.User.by_username('test-admin') |
| print(u.get_tool_data('AuthPasswordReset', 'hash')) |
| assert u.get_tool_data('AuthPasswordReset', 'hash') == '' |
| assert u.get_tool_data('AuthPasswordReset', 'hash_expiry') == '' |
| |
| @td.with_user_project('test-admin') |
| def test_change_password(self): |
| self.app.get('/').follow() # establish session |
| # Get and assert user with password reset token. |
| user = self._create_password_reset_hash() |
| old_pass = user.get_pref('password') |
| |
| # Change password |
| with audits('Password changed', user=True): |
| self.app.post('/auth/preferences/change_password', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'oldpw': 'foo', |
| 'pw': 'asdfasdf', |
| 'pw2': 'asdfasdf', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| |
| # Confirm password was changed. |
| assert old_pass != user.get_pref('password') |
| |
| # Confirm any existing tokens were reset. |
| assert user.get_tool_data('AuthPasswordReset', 'hash') == '' |
| assert user.get_tool_data('AuthPasswordReset', 'hash_expiry') == '' |
| |
| # Confirm an email was sent |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert len(tasks) == 1 |
| assert tasks[0].kwargs['subject'] == 'Password Changed' |
| assert 'The password for your' in tasks[0].kwargs['text'] |
| |
| @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)) |
| @td.with_user_project('test-admin') |
| def test_change_password_hibp(self): |
| self.app.get('/').follow() # establish session |
| # Get and assert user with password reset token. |
| user = self._create_password_reset_hash() |
| old_pass = user.get_pref('password') |
| |
| # Attempt change password with weak pwd |
| r = self.app.post('/auth/preferences/change_password', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'oldpw': 'foo', |
| 'pw': 'password', |
| 'pw2': 'password', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| |
| assert 'Unsafe' in str(r.headers) |
| |
| r = self.app.post('/auth/preferences/change_password', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'oldpw': 'foo', |
| 'pw': '3j84rhoirwnoiwrnoiw', |
| 'pw2': '3j84rhoirwnoiwrnoiw', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| assert 'Unsafe' not in str(r.headers) |
| |
| # Confirm password was changed. |
| user = M.User.by_username('test-admin') |
| assert old_pass != user.get_pref('password') |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| @td.with_user_project('test-admin') |
| def test_prefs(self, gen_message_id, sendsimplemail): |
| r = self.app.get('/auth/preferences/', |
| extra_environ=dict(username='test-admin')) |
| # check preconditions of test data |
| assert 'test@example.com' not in r |
| assert 'test-admin@users.localhost' in r |
| assert (M.User.query.get(username='test-admin').get_pref('email_address') == |
| 'test-admin@users.localhost') |
| |
| # add test@example |
| with td.audits('New email address: test@example.com', user=True): |
| r = self.app.post('/auth/preferences/update_emails', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'new_addr.addr': 'test@example.com', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| r = self.app.get('/auth/preferences/') |
| assert 'test@example.com' in r |
| user = M.User.query.get(username='test-admin') |
| assert user.get_pref('email_address') == 'test-admin@users.localhost' |
| |
| # remove test-admin@users.localhost |
| with td.audits('Email address deleted: test-admin@users.localhost', user=True): |
| r = self.app.post('/auth/preferences/update_emails', |
| extra_environ=dict(username='test-admin'), |
| params={ |
| 'addr-1.ord': '1', |
| 'addr-1.delete': 'on', |
| 'addr-2.ord': '2', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost', |
| 'password': 'foo', |
| 'preferences.email_format': 'plain', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| |
| # assert 'email_removed' notification email sent |
| args, kwargs = sendsimplemail.post.call_args |
| assert kwargs['toaddr'] == user._id |
| assert kwargs['subject'] == 'Email Address Removed' |
| |
| r = self.app.get('/auth/preferences/') |
| assert 'test-admin@users.localhost' not in r |
| # preferred address has not changed if email is not verified |
| user = M.User.query.get(username='test-admin') |
| assert user.get_pref('email_address') == None |
| |
| with td.audits('Display Name changed Test Admin => Admin', user=True): |
| r = self.app.post('/auth/preferences/update', |
| params={'preferences.display_name': 'Admin', |
| '_session_id': self.app.cookies['_session_id'], |
| }, |
| extra_environ=dict(username='test-admin')) |
| |
| @td.with_user_project('test-admin') |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_email_prefs_change_requires_password(self, gen_message_id, sendsimplemail): |
| self.app.get('/').follow() # establish session |
| # Claim new email |
| new_email_params = { |
| 'new_addr.addr': 'test@example.com', |
| 'new_addr.claim': 'Claim Address', |
| 'primary_addr': 'test-admin@users.localhost', |
| '_session_id': self.app.cookies['_session_id'], |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to claim new email' in self.webflash(r) |
| assert 'test@example.com' not in r.follow() |
| new_email_params['password'] = 'bad pass' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to claim new email' in self.webflash(r) |
| assert 'test@example.com' not in r.follow() |
| new_email_params['password'] = 'foo' # valid password |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=new_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to claim new email' not in self.webflash(r) |
| assert 'test@example.com' in r.follow() |
| |
| # Change primary address |
| change_primary_params = { |
| 'new_addr.addr': '', |
| 'primary_addr': 'test@example.com', |
| '_session_id': self.app.cookies['_session_id'], |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to change primary address' in self.webflash(r) |
| assert M.User.by_username('test-admin').get_pref('email_address') == 'test-admin@users.localhost' |
| change_primary_params['password'] = 'bad pass' |
| |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to change primary address' in self.webflash(r) |
| assert M.User.by_username('test-admin').get_pref('email_address') == 'test-admin@users.localhost' |
| change_primary_params['password'] = 'foo' # valid password |
| |
| self.app.get('/auth/preferences/') # let previous 'flash' message cookie get used up |
| r = self.app.post('/auth/preferences/update_emails', |
| params=change_primary_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to change primary address' not in self.webflash(r) |
| assert M.User.by_username('test-admin').get_pref('email_address') == 'test@example.com' |
| |
| # assert 'email added' notification email sent using original primary addr |
| args, kwargs = sendsimplemail.post.call_args |
| assert kwargs['toaddr'] == 'test-admin@users.localhost' |
| assert kwargs['subject'] == 'Primary Email Address Changed' |
| |
| # Remove email |
| remove_email_params = { |
| 'addr-1.ord': '1', |
| 'addr-2.ord': '2', |
| 'addr-2.delete': 'on', |
| 'new_addr.addr': '', |
| 'primary_addr': 'test-admin@users.localhost', |
| '_session_id': self.app.cookies['_session_id'], |
| } |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to delete an email' in self.webflash(r) |
| assert 'test@example.com' in r.follow() |
| remove_email_params['password'] = 'bad pass' |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to delete an email' in self.webflash(r) |
| assert 'test@example.com' in r.follow() |
| remove_email_params['password'] = 'foo' # vallid password |
| r = self.app.post('/auth/preferences/update_emails', |
| params=remove_email_params, |
| extra_environ=dict(username='test-admin')) |
| assert 'You must provide your current password to delete an email' not in self.webflash(r) |
| assert 'test@example.com' not in r.follow() |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions(self): |
| r = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| subscriptions = M.Mailbox.query.find(dict( |
| user_id=c.user._id, is_flash=False)).all() |
| # make sure page actually lists all the user's subscriptions |
| assert len(subscriptions) > 0, 'Test user has no subscriptions, cannot verify that they are shown' |
| for m in subscriptions: |
| assert str(m._id) in r, "Page doesn't list subscription for Mailbox._id = %s" % m._id |
| |
| # make sure page lists all tools which user can subscribe |
| user = M.User.query.get(username='test-admin') |
| for p in user.my_projects(): |
| for ac in p.app_configs: |
| if not M.Mailbox.subscribed(project_id=p._id, app_config_id=ac._id): |
| if ac.tool_name in ('activity', 'admin', 'search', 'userstats', 'profile'): |
| # these have has_notifications=False |
| assert str(ac._id) not in r, "Page lists tool %s but it should not" % ac.tool_name |
| else: |
| assert str(ac._id) in r, "Page doesn't list tool %s" % ac.tool_name |
| |
| @td.with_user_project('test-admin') |
| def test_update_user_notifications(self): |
| self.app.get('/').follow() # establish session |
| assert not M.User.query.get(username='test-admin').get_pref('mention_notifications') |
| self.app.post('/auth/subscriptions/update_user_notifications', |
| params={'_session_id': self.app.cookies['_session_id'], |
| }) |
| assert not M.User.query.get(username='test-admin').get_pref('mention_notifications') |
| self.app.post('/auth/subscriptions/update_user_notifications', |
| params={'allow_umnotif': 'on', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| assert M.User.query.get(username='test-admin').get_pref('mention_notifications') |
| |
| def _find_subscriptions_form(self, r): |
| form = None |
| for f in r.forms.values(): |
| if f.action == 'update_subscriptions': |
| form = f |
| break |
| assert form is not None, "Can't find subscriptions form" |
| return form |
| |
| def _find_subscriptions_field(self, form, subscribed=False): |
| field_name = None |
| for k, v in form.fields.items(): |
| if subscribed: |
| check = v and v[0].value == 'on' |
| else: |
| check = v and v[0].value != 'on' |
| if k and k.endswith('.subscribed') and check: |
| field_name = k.replace('.subscribed', '') |
| assert field_name, "Can't find unsubscribed tool for user" |
| return field_name |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions_subscribe(self): |
| resp = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| form = self._find_subscriptions_form(resp) |
| # find not subscribed tool, subscribe and verify |
| field_name = self._find_subscriptions_field(form, subscribed=False) |
| t_id = ObjectId(form.fields[field_name + '.tool_id'][0].value) |
| p_id = ObjectId(form.fields[field_name + '.project_id'][0].value) |
| subscribed = M.Mailbox.subscribed(project_id=p_id, app_config_id=t_id) |
| assert not subscribed, "User already subscribed for tool %s" % t_id |
| form.fields[field_name + '.subscribed'][0].value = 'on' |
| form.submit() |
| subscribed = M.Mailbox.subscribed(project_id=p_id, app_config_id=t_id) |
| assert subscribed, "User is not subscribed for tool %s" % t_id |
| |
| @td.with_user_project('test-admin') |
| def test_prefs_subscriptions_unsubscribe(self): |
| resp = self.app.get('/auth/subscriptions/', |
| extra_environ=dict(username='test-admin')) |
| form = self._find_subscriptions_form(resp) |
| field_name = self._find_subscriptions_field(form, subscribed=True) |
| s_id = ObjectId(form.fields[field_name + '.subscription_id'][0].value) |
| s = M.Mailbox.query.get(_id=s_id) |
| assert s, "User has not subscription with Mailbox._id = %s" % s_id |
| form.fields[field_name + '.subscribed'][0].value = None |
| form.submit() |
| s = M.Mailbox.query.get(_id=s_id) |
| assert not s, "User still has subscription with Mailbox._id %s" % s_id |
| |
| def test_format_email(self): |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/subscriptions/update_subscriptions', |
| params={'email_format': 'plain', 'subscriptions': '', |
| '_session_id': self.app.cookies['_session_id']}) |
| r = self.app.get('/auth/subscriptions/') |
| assert '<option selected value="plain">Plain Text</option>' in r |
| self.app.post('/auth/subscriptions/update_subscriptions', |
| params={'email_format': 'both', 'subscriptions': '', |
| '_session_id': self.app.cookies['_session_id']}) |
| r = self.app.get('/auth/subscriptions/') |
| assert '<option selected value="both">HTML</option>' in r |
| |
| def test_create_account(self): |
| r = self.app.get('/auth/create_account') |
| assert 'Create an Account' in r |
| r = self.app.post('/auth/save_new', |
| params=dict(username='AAA', pw='123', |
| _session_id=self.app.cookies['_session_id'])) |
| assert 'Enter a value 6 characters long or more' in r |
| assert ('Usernames must include only small letters, numbers, ' |
| 'and dashes. They must also start with a letter and be ' |
| 'at least 3 characters long.' in r) |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| r = r.follow().follow() |
| assert 'User "aaa" registered' in unentity(r.text) |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| assert 'That username is already taken. Please choose another.' in r |
| r = self.app.get('/auth/logout') |
| r = self.app.post( |
| '/auth/do_login', |
| params=dict(username='aaa', password='12345678', |
| _session_id=self.app.cookies['_session_id']), antispam=True, |
| status=302) |
| |
| def test_create_account_require_email(self): |
| self.app.get('/').follow() # establish session |
| with h.push_config(config, **{'auth.require_email_addr': 'false'}): |
| self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='aaa') |
| assert not user.pending |
| assert M.Project.query.find({'name': 'u/aaa'}).count() == 1 |
| with h.push_config(config, **{'auth.require_email_addr': 'true'}): |
| self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='bbb', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com', |
| _session_id=self.app.cookies['_session_id'] |
| )) |
| user = M.User.query.get(username='bbb') |
| assert user.pending |
| assert M.Project.query.find({'name': 'u/bbb'}).count() == 0 |
| |
| def test_verify_email(self): |
| with h.push_config(config, **{'auth.require_email_addr': 'true'}): |
| self.app.get('/').follow() # establish session |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com', |
| _session_id=self.app.cookies['_session_id'] |
| )) |
| r = r.follow() |
| user = M.User.query.get(username='aaa') |
| em = M.EmailAddress.get(email='test@example.com') |
| assert user._id == em.claimed_by_user_id |
| r = self.app.get('/auth/verify_addr', params=dict(a=em.nonce)) |
| user = M.User.query.get(username='aaa') |
| em = M.EmailAddress.get(email='test@example.com') |
| assert not user.pending |
| assert em.confirmed |
| assert user.get_pref('email_address') |
| assert M.Project.query.find({'name': 'u/aaa'}).count() == 1 |
| |
| def test_create_account_disabled_header_link(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| r = self.app.get('/') |
| assert 'Register' not in r |
| |
| def test_create_account_disabled_form_gone(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| r = self.app.get('/auth/create_account', status=404) |
| assert 'Create an Account' not in r |
| |
| def test_create_account_disabled_submit_fails(self): |
| with h.push_config(config, **{'auth.allow_user_registration': 'false'}): |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| _session_id=self.app.cookies['_session_id'] |
| ), |
| status=404) |
| |
| def test_one_project_role(self): |
| """Make sure when a user goes to a new project only one project role is created. |
| There was an issue with extra project roles getting created if a user went directly to |
| an admin page.""" |
| p_nbhd = M.Neighborhood.query.get(name='Projects') |
| p = M.Project.query.get(shortname='test', neighborhood_id=p_nbhd._id) |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/save_new', params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me', |
| email='test@example.com', |
| _session_id=self.app.cookies['_session_id'], |
| )).follow() |
| user = M.User.query.get(username='aaa') |
| user.pending = False |
| session(user).flush(user) |
| assert M.ProjectRole.query.find( |
| dict(user_id=user._id, project_id=p._id)).count() == 0 |
| |
| self.app.get('/p/test/admin/permissions', |
| extra_environ=dict(username='aaa'), status=403) |
| assert M.ProjectRole.query.find( |
| dict(user_id=user._id, project_id=p._id)).count() <= 1 |
| |
| def test_default_lookup(self): |
| # Make sure that default _lookup() throws 404 |
| self.app.get('/auth/foobar', status=404) |
| |
| def test_disabled_user(self): |
| user = M.User.query.get(username='test-admin') |
| sess = session(user) |
| assert not user.disabled |
| r = self.app.get('/p/test/admin/', |
| extra_environ={'username': 'test-admin'}) |
| assert r.status_int == 200, 'Redirect to %s' % r.location |
| user.disabled = True |
| sess.save(user) |
| sess.flush() |
| user = M.User.query.get(username='test-admin') |
| assert user.disabled |
| r = self.app.get('/p/test/admin/', |
| extra_environ={'username': 'test-admin'}) |
| assert r.status_int == 302 |
| assert r.location == 'http://localhost/auth/?return_to=%2Fp%2Ftest%2Fadmin%2F' |
| |
| def test_no_open_return_to(self): |
| r = self.app.get('/auth/logout').follow().follow() |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| return_to='/foo', |
| _session_id=self.app.cookies['_session_id']), |
| antispam=True |
| ) |
| assert r.location == 'http://localhost/foo' |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='foo', |
| return_to='http://localhost/foo', |
| _session_id=self.app.cookies['_session_id'])) |
| assert r.location == 'http://localhost/foo' |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='foo', |
| return_to='http://example.com/foo', |
| _session_id=self.app.cookies['_session_id'])).follow() |
| assert r.location == 'http://localhost/dashboard' |
| |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', antispam=True, params=dict( |
| username='test-user', password='foo', |
| return_to='//example.com/foo', |
| _session_id=self.app.cookies['_session_id'])).follow() |
| assert r.location == 'http://localhost/dashboard' |
| |
| def test_no_injected_headers_in_return_to(self): |
| r = self.app.get('/auth/logout').follow().follow() |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo', |
| return_to='/foo\nContent-Length: 777', |
| # WebTest actually will raise an error if there's an invalid header (webob itself does not) |
| _session_id=self.app.cookies['_session_id']), |
| antispam=True |
| ) |
| assert r.location == 'http://localhost/' |
| assert r.content_length != 777 |
| |
| |
| @with_nose_compatibility |
| class TestAuthRest(TestRestApiBase): |
| |
| def test_tools_list_anon(self): |
| resp = self.api_get('/rest/auth/tools/wiki', user='*anonymous') |
| assert resp.json == { |
| 'tools': [] |
| } |
| |
| def test_tools_list_invalid_tool(self): |
| resp = self.api_get('/rest/auth/tools/af732q9547235') |
| assert resp.json == { |
| 'tools': [] |
| } |
| |
| @td.with_tool('test', 'Wiki', mount_point='docs', mount_label='Documentation') |
| def test_tools_list_wiki(self): |
| resp = self.api_get('/rest/auth/tools/wiki') |
| assert resp.json == { |
| 'tools': [ |
| { |
| 'mount_label': 'Wiki', |
| 'mount_point': 'wiki', |
| 'name': 'wiki', |
| 'project_name': 'Home Project for Adobe', |
| 'url': 'http://localhost/adobe/wiki/', |
| 'api_url': 'http://localhost/rest/adobe/wiki/', |
| }, |
| { |
| 'mount_label': 'Documentation', |
| 'mount_point': 'docs', |
| 'name': 'wiki', |
| 'project_name': 'Test Project', |
| 'url': 'http://localhost/p/test/docs/', |
| 'api_url': 'http://localhost/rest/p/test/docs/', |
| }, |
| ] |
| } |
| |
| |
| @with_nose_compatibility |
| class TestPreferences(TestController): |
| @td.with_user_project('test-admin') |
| def test_personal_data(self): |
| from pytz import country_names |
| |
| setsex, setbirthdate, setcountry, setcity, settimezone = \ |
| ('Male', '19/08/1988', 'IT', 'Milan', 'Europe/Rome') |
| self.app.get('/auth/user_info/') |
| |
| # Check if personal data is properly set |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict( |
| sex=setsex, |
| birthdate=setbirthdate, |
| country=setcountry, |
| city=setcity, |
| timezone=settimezone, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| sex = user.sex |
| assert sex == setsex |
| birthdate = user.birthdate.strftime('%d/%m/%Y') |
| assert birthdate == setbirthdate |
| country = user.localization.country |
| assert country_names.get(setcountry) == country |
| city = user.localization.city |
| assert city == setcity |
| timezone = user.timezone |
| assert timezone == settimezone |
| |
| # Check if setting a wrong date everything works correctly |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict(birthdate='30/02/1998', _session_id=self.app.cookies['_session_id'])) |
| assert 'Please enter a valid date' in r.text |
| user = M.User.query.get(username='test-admin') |
| sex = user.sex |
| assert sex == setsex |
| birthdate = user.birthdate.strftime('%d/%m/%Y') |
| assert birthdate == setbirthdate |
| country = user.localization.country |
| assert country_names.get(setcountry) == country |
| city = user.localization.city |
| assert city == setcity |
| timezone = user.timezone |
| assert timezone == settimezone |
| |
| # Check deleting birthdate |
| r = self.app.post('/auth/user_info/change_personal_data', |
| params=dict( |
| sex=setsex, |
| birthdate='', |
| country=setcountry, |
| city=setcity, |
| timezone=settimezone, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert user.birthdate is None |
| |
| @td.with_user_project('test-admin') |
| def test_contacts(self): |
| # Add skype account |
| testvalue = 'testaccount' |
| self.app.get('/auth/user_info/contacts/') |
| self.app.post('/auth/user_info/contacts/skype_account', |
| params=dict(skypeaccount=testvalue, _session_id=self.app.cookies['_session_id'])) |
| user = M.User.query.get(username='test-admin') |
| assert user.skypeaccount == testvalue |
| |
| # Add social network account |
| socialnetwork = 'Facebook' |
| accounturl = 'http://www.facebook.com/test' |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(socialnetwork=socialnetwork, |
| accounturl=accounturl, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 |
| assert user.socialnetworks[0].socialnetwork == socialnetwork |
| assert user.socialnetworks[0].accounturl == accounturl |
| |
| # Add second social network account |
| socialnetwork2 = 'Twitter' |
| accounturl2 = 'http://twitter.com/test' |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(socialnetwork=socialnetwork2, |
| accounturl='@test', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 2 |
| assert {'socialnetwork': socialnetwork, 'accounturl': accounturl} in user.socialnetworks |
| assert {'socialnetwork': socialnetwork2, 'accounturl': accounturl2} in user.socialnetworks |
| |
| # Remove first social network account |
| self.app.post('/auth/user_info/contacts/remove_social_network', |
| params=dict(socialnetwork=socialnetwork, |
| account=accounturl, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 |
| assert {'socialnetwork': socialnetwork2, 'accounturl': accounturl2} in user.socialnetworks |
| |
| # Add empty social network account |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(accounturl=accounturl, socialnetwork='', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 |
| assert {'socialnetwork': socialnetwork2, 'accounturl': accounturl2} in user.socialnetworks |
| |
| # Add invalid social network account |
| self.app.post('/auth/user_info/contacts/add_social_network', |
| params=dict(accounturl=accounturl, socialnetwork='invalid', |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.socialnetworks) == 1 |
| assert {'socialnetwork': socialnetwork2, 'accounturl': accounturl2} in user.socialnetworks |
| |
| # Add telephone number |
| telnumber = '+3902123456' |
| self.app.post('/auth/user_info/contacts/add_telnumber', |
| params=dict(newnumber=telnumber, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 1 and (user.telnumbers[0] == telnumber)) |
| |
| # Add second telephone number |
| telnumber2 = '+3902654321' |
| self.app.post('/auth/user_info/contacts/add_telnumber', |
| params=dict(newnumber=telnumber2, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 2 and telnumber in user.telnumbers and telnumber2 in user.telnumbers) |
| |
| # Remove first telephone number |
| self.app.post('/auth/user_info/contacts/remove_telnumber', |
| params=dict(oldvalue=telnumber, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.telnumbers) == 1 and telnumber2 in user.telnumbers) |
| |
| # Add website |
| website = 'http://www.testurl.com' |
| self.app.post('/auth/user_info/contacts/add_webpage', |
| params=dict(newwebsite=website, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 1 and (website in user.webpages)) |
| |
| # Add second website |
| website2 = 'http://www.testurl2.com' |
| self.app.post('/auth/user_info/contacts/add_webpage', |
| params=dict(newwebsite=website2, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 2 and website in user.webpages and website2 in user.webpages) |
| |
| # Remove first website |
| self.app.post('/auth/user_info/contacts/remove_webpage', |
| params=dict(oldvalue=website, |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert (len(user.webpages) == 1 and website2 in user.webpages) |
| |
| @td.with_user_project('test-admin') |
| def test_availability(self): |
| # Add availability timeslot |
| weekday = 'Monday' |
| starttime = time(9, 0, 0) |
| endtime = time(12, 0, 0) |
| |
| self.app.get('/auth/user_info/availability/') |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday, |
| starttime=starttime.strftime('%H:%M'), |
| endtime=endtime.strftime('%H:%M'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| timeslot1dict = dict(week_day=weekday, start_time=starttime, end_time=endtime) |
| assert len(user.availability) == 1 and timeslot1dict in user.get_availability_timeslots() |
| |
| weekday2 = 'Tuesday' |
| starttime2 = time(14, 0, 0) |
| endtime2 = time(16, 0, 0) |
| |
| # Add second availability timeslot |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday2, |
| starttime=starttime2.strftime('%H:%M'), |
| endtime=endtime2.strftime('%H:%M'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| timeslot2dict = dict(week_day=weekday2, start_time=starttime2, end_time=endtime2) |
| assert len(user.availability) == 2 |
| assert timeslot1dict in user.get_availability_timeslots() |
| assert timeslot2dict in user.get_availability_timeslots() |
| |
| # Remove availability timeslot |
| r = self.app.post('/auth/user_info/availability/remove_timeslot', |
| params=dict( |
| weekday=weekday, |
| starttime=starttime.strftime('%H:%M'), |
| endtime=endtime.strftime('%H:%M'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.availability) == 1 and timeslot2dict in user.get_availability_timeslots() |
| |
| # Add invalid availability timeslot |
| r = self.app.post('/auth/user_info/availability/add_timeslot', |
| params=dict( |
| weekday=weekday2, |
| starttime=endtime2.strftime('%H:%M'), |
| endtime=starttime2.strftime('%H:%M'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| assert 'Invalid period:' in str(r) |
| user = M.User.query.get(username='test-admin') |
| timeslot2dict = dict(week_day=weekday2, start_time=starttime2, end_time=endtime2) |
| assert len(user.availability) == 1 and timeslot2dict in user.get_availability_timeslots() |
| |
| @td.with_user_project('test-admin') |
| def test_inactivity(self): |
| # Add inactivity period |
| now = datetime.utcnow().date() |
| now = datetime(now.year, now.month, now.day) |
| startdate = now + timedelta(days=1) |
| enddate = now + timedelta(days=7) |
| self.app.get('/auth/user_info/availability/') |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate=startdate.strftime('%d/%m/%Y'), |
| enddate=enddate.strftime('%d/%m/%Y'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| period1dict = dict(start_date=startdate, end_date=enddate) |
| assert len(user.inactiveperiod) == 1 and period1dict in user.get_inactive_periods() |
| |
| # Add second inactivity period |
| startdate2 = now + timedelta(days=24) |
| enddate2 = now + timedelta(days=28) |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate=startdate2.strftime('%d/%m/%Y'), |
| enddate=enddate2.strftime('%d/%m/%Y'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| period2dict = dict(start_date=startdate2, end_date=enddate2) |
| assert len(user.inactiveperiod) == 2 |
| assert period1dict in user.get_inactive_periods() |
| assert period2dict in user.get_inactive_periods() |
| |
| # Remove first inactivity period |
| r = self.app.post( |
| '/auth/user_info/availability/remove_inactive_period', |
| params=dict( |
| startdate=startdate.strftime('%d/%m/%Y'), |
| enddate=enddate.strftime('%d/%m/%Y'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.inactiveperiod) == 1 and period2dict in user.get_inactive_periods() |
| |
| # Add invalid inactivity period |
| r = self.app.post('/auth/user_info/availability/add_inactive_period', |
| params=dict( |
| startdate='NOT/A/DATE', |
| enddate=enddate2.strftime('%d/%m/%Y'), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert 'Please enter a valid date' in str(r) |
| assert len(user.inactiveperiod) == 1 and period2dict in user.get_inactive_periods() |
| |
| @td.with_user_project('test-admin') |
| def test_skills(self): |
| setup_trove_categories() |
| # Add a skill |
| skill_cat = M.TroveCategory.query.get(show_as_skill=True) |
| level = 'low' |
| comment = 'test comment' |
| self.app.get('/auth/user_info/skills/') |
| self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level, |
| comment=comment, |
| selected_skill=str(skill_cat.trove_cat_id), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| skilldict = dict(category_id=skill_cat._id, |
| comment=comment, level=level) |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Add again the same skill |
| level = 'medium' |
| comment = 'test comment 2' |
| self.app.get('/auth/user_info/skills/') |
| self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level, |
| comment=comment, |
| selected_skill=str(skill_cat.trove_cat_id), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| skilldict = dict(category_id=skill_cat._id, |
| comment=comment, level=level) |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Add an invalid skill |
| level2 = 'not a level' |
| comment2 = 'test comment 2' |
| self.app.post('/auth/user_info/skills/save_skill', |
| params=dict( |
| level=level2, |
| comment=comment2, |
| selected_skill=str(skill_cat.trove_cat_id), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| # Check that everything is as it was before |
| assert len(user.skills) == 1 and skilldict in user.skills |
| |
| # Remove a skill |
| self.app.get('/auth/user_info/skills/') |
| self.app.post('/auth/user_info/skills/remove_skill', |
| params=dict( |
| categoryid=str(skill_cat.trove_cat_id), |
| _session_id=self.app.cookies['_session_id'], |
| )) |
| user = M.User.query.get(username='test-admin') |
| assert len(user.skills) == 0 |
| |
| @td.with_user_project('test-admin') |
| def test_user_message(self): |
| self.app.get('/').follow() # establish session |
| assert not M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| self.app.post('/auth/preferences/user_message', |
| params={'_session_id': self.app.cookies['_session_id'], |
| }) |
| assert M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| self.app.post('/auth/preferences/user_message', |
| params={'allow_user_messages': 'on', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| assert not M.User.query.get(username='test-admin').get_pref('disable_user_messages') |
| |
| @td.with_user_project('test-admin') |
| def test_additional_page(self): |
| class MyPP(plugin.UserPreferencesProvider): |
| def not_page(self): |
| return 'not page' |
| |
| @expose() |
| def new_page(self): |
| return 'new page' |
| |
| with mock.patch.object(plugin.UserPreferencesProvider, 'get') as upp_get: |
| upp_get.return_value = MyPP() |
| r = self.app.get('/auth/new_page') |
| assert r.text == 'new page' |
| self.app.get('/auth/not_page', status=404) |
| |
| |
| @with_nose_compatibility |
| class TestPasswordReset(TestController): |
| test_primary_email = 'testprimaryaddr@mail.com' |
| |
| def setUp(self): |
| super().setup_method(method) |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| @patch('allura.model.User.send_password_reset_email') |
| @patch('allura.lib.plugin.LocalAuthenticationProvider.resend_verification_link') |
| @patch('allura.tasks.mail_tasks.sendmail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_email_unconfirmed(self, gen_message_id, sendmail, p_sendlink, p_sendpwd): |
| user = M.User.query.get(username='test-admin') |
| user.pending = True |
| email = M.EmailAddress.find( |
| {'claimed_by_user_id': user._id}).first() |
| email.confirmed = False |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is None |
| p_sendlink.assert_called_once() |
| p_sendpwd.assert_not_called() |
| |
| @patch('allura.tasks.mail_tasks.sendmail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_user_disabled(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find( |
| {'claimed_by_user_id': user._id}).first() |
| user.disabled = True |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is None |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_only_primary_email_reset_allowed(self, gen_message_id, sendmail): |
| self.app.get('/').follow() # establish session |
| user = M.User.query.get(username='test-admin') |
| user.claim_address(self.test_primary_email) |
| user.set_pref('email_address', self.test_primary_email) |
| |
| email = M.EmailAddress.find({'email': self.test_primary_email}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| |
| with h.push_config(config, **{'auth.allow_non_primary_email_password_reset': 'false'}): |
| self.app.post('/auth/password_recovery_hash', {'email': self.test_primary_email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is not None |
| args, kwargs = sendmail.post.call_args |
| assert kwargs['toaddr'] == self.test_primary_email |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_non_primary_email_reset_allowed(self, gen_message_id, sendmail): |
| self.app.get('/').follow() # establish session |
| user = M.User.query.get(username='test-admin') |
| email1 = M.EmailAddress.find({'claimed_by_user_id': user._id}).first() |
| user.claim_address(self.test_primary_email) |
| user.set_pref('email_address', self.test_primary_email) |
| email = M.EmailAddress.find({'email': self.test_primary_email}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| with h.push_config(config, **{'auth.allow_non_primary_email_password_reset': 'true'}): |
| self.app.post('/auth/password_recovery_hash', {'email': email1.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| assert hash is not None |
| args, kwargs = sendmail.post.call_args |
| assert kwargs['toaddr'] == email1.email |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_password_reset(self, gen_message_id, sendsimplemail): |
| self.app.get('/').follow() # establish session |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find({'claimed_by_user_id': user._id}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| old_pw_hash = user.password |
| |
| # request a reset |
| with td.audits('Password recovery link sent to: ' + email.email, user=True): |
| r = self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| # confirm some fields |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert hash is not None |
| assert hash_expiry is not None |
| |
| # confirm email sent |
| text = '''Your username is test-admin |
| |
| To update your password on %s, please visit the following URL: |
| |
| %s/auth/forgotten_password/%s''' % (config['site_name'], config['base_url'], hash) |
| sendsimplemail.post.assert_called_once_with( |
| sender='noreply@localhost', |
| toaddr=email.email, |
| fromaddr='"{}" <{}>'.format(config['site_name'], config['forgemail.return_path']), |
| reply_to=config['forgemail.return_path'], |
| subject='Allura Password recovery', |
| message_id=gen_message_id(), |
| text=text) |
| |
| # load reset form and fill it out |
| r = self.app.get('/auth/forgotten_password/%s' % hash) |
| assert 'Enter a new password for: test-admin' in r |
| assert 'New Password:' in r |
| assert 'New Password (again):' in r |
| form = r.forms[0] |
| form['pw'] = form['pw2'] = new_password = '154321' |
| with td.audits(r'Password changed \(through recovery process\)', user=True): |
| # escape parentheses, so they would not be treated as regex group |
| r = form.submit() |
| |
| # verify 'Password Changed' email sent |
| args, kwargs = sendsimplemail.post.call_args |
| assert kwargs['toaddr'] == user._id |
| assert kwargs['subject'] == 'Password Changed' |
| |
| # confirm password changed and works |
| user = M.User.query.get(username='test-admin') |
| assert old_pw_hash != user.password |
| provider = plugin.LocalAuthenticationProvider(None) |
| assert provider._validate_password(user, new_password) |
| |
| # confirm reset fields cleared |
| user = M.User.query.get(username='test-admin') |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert hash == '' |
| assert hash_expiry == '' |
| |
| # confirm can log in now in same session |
| r = r.follow() |
| assert 'Log Out' not in r, r |
| form = r.forms[0] |
| encoded = self.app.antispam_field_names(r.form) |
| form[encoded['username']] = 'test-admin' |
| form[encoded['password']] = new_password |
| r = form.submit(status=302) |
| r = r.follow().follow() |
| assert 'Log Out' in r, r |
| |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_hash_expired(self, gen_message_id, sendmail): |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find( |
| {'claimed_by_user_id': user._id}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| user = M.User.by_username('test-admin') |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| user.set_tool_data('AuthPasswordReset', |
| hash_expiry=datetime(2000, 10, 10)) |
| r = self.app.get('/auth/forgotten_password/%s' % hash.encode('utf-8')) |
| assert 'Unable to process reset, please try again' in r.follow().text |
| r = self.app.post('/auth/set_new_password/%s' % |
| hash.encode('utf-8'), {'pw': '154321', 'pw2': '154321', |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| assert 'Unable to process reset, please try again' in r.follow().text |
| |
| def test_hash_invalid(self): |
| r = self.app.get('/auth/forgotten_password/123412341234', status=302) |
| assert 'Unable to process reset, please try again' in r.follow().text |
| |
| @patch('allura.lib.plugin.AuthenticationProvider') |
| def test_provider_disabled(self, AP): |
| user = M.User.query.get(username='test-admin') |
| ap = AP.get() |
| ap.forgotten_password_process = False |
| ap.authenticate_request()._id = user._id |
| ap.by_username().username = user.username |
| self.app.get('/auth/forgotten_password', status=404) |
| self.app.get('/').follow() # establish session |
| self.app.post('/auth/set_new_password', |
| {'pw': 'foo', 'pw2': 'foo', '_session_id': self.app.cookies['_session_id']}, |
| status=404) |
| self.app.post('/auth/password_recovery_hash', |
| {'email': 'foo', '_session_id': self.app.cookies['_session_id']}, |
| status=404) |
| |
| @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)) |
| @patch('allura.tasks.mail_tasks.sendsimplemail') |
| @patch('allura.lib.helpers.gen_message_id') |
| def test_pwd_reset_hibp_check(self, gen_message_id, sendmail): |
| self.app.get('/').follow() # establish session |
| user = M.User.query.get(username='test-admin') |
| email = M.EmailAddress.find({'claimed_by_user_id': user._id}).first() |
| email.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| |
| # request a reset |
| r = self.app.post('/auth/password_recovery_hash', {'email': email.email, |
| '_session_id': self.app.cookies['_session_id'], |
| }) |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| |
| # load reset form and fill it out with weak password |
| r = self.app.get('/auth/forgotten_password/%s' % hash) |
| form = r.forms[0] |
| form['pw'] = form['pw2'] = new_password = 'password' |
| r = form.submit() |
| assert 'Unsafe' in str(r.headers) |
| |
| # fill it out again, with a stronger password |
| r = r.follow() |
| form = r.forms[0] |
| form['pw'] = form['pw2'] = new_password = 'oj35h9u34280j924hnuiw' # something unlikely to trip at hibp |
| r = form.submit() |
| assert 'Unsafe' not in str(r.headers) |
| |
| # confirm password changed and works |
| user = M.User.query.get(username='test-admin') |
| provider = plugin.LocalAuthenticationProvider(None) |
| assert provider._validate_password(user, new_password) |
| |
| # confirm can log in now in same session |
| r = r.follow() |
| assert 'Log Out' not in r, r |
| form = r.forms[0] |
| encoded = self.app.antispam_field_names(r.form) |
| form[encoded['username']] = 'test-admin' |
| form[encoded['password']] = new_password |
| r = form.submit(status=302) |
| r = r.follow().follow() |
| assert 'Log Out' in r, r |
| |
| |
| @with_nose_compatibility |
| class TestOAuth(TestController): |
| def test_register_deregister_app(self): |
| # register |
| r = self.app.get('/auth/oauth/') |
| r = self.app.post('/auth/oauth/register', |
| params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez', |
| '_session_id': self.app.cookies['_session_id'], |
| }).follow() |
| assert 'oautstapp' in r |
| # deregister |
| assert r.forms[0].action == 'deregister' |
| r.forms[0].submit() |
| r = self.app.get('/auth/oauth/') |
| assert 'oautstapp' not in r |
| |
| def test_generate_revoke_access_token(self): |
| # generate |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/oauth/register', |
| params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez', |
| '_session_id': self.app.cookies['_session_id'], |
| }, status=302) |
| r = self.app.get('/auth/oauth/') |
| assert r.forms[1].action == 'generate_access_token' |
| r = r.forms[1].submit(extra_environ={'username': 'test-user'}) # not the right user |
| assert "Invalid app ID" in self.webflash(r) # gets an error |
| r = self.app.get('/auth/oauth/') # do it again |
| r = r.forms[1].submit() # as correct user |
| assert '' == self.webflash(r) |
| |
| r = self.app.get('/auth/oauth/') |
| assert 'Bearer Token:' in r |
| assert ( |
| M.OAuthAccessToken.for_user(M.User.by_username('test-admin')) != []) |
| # revoke |
| assert r.forms[0].action == 'revoke_access_token' |
| r.forms[0].submit() |
| r = self.app.get('/auth/oauth/') |
| assert r.forms[0].action != 'revoke_access_token' |
| assert ( |
| M.OAuthAccessToken.for_user(M.User.by_username('test-admin')) == []) |
| |
| def test_interactive(self): |
| user = M.User.by_username('test-admin') |
| M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| secret_key='test-client-secret', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| ThreadLocalORMSession.flush_all() |
| oauth_params = dict( |
| client_key='api_key_api_key_12345', |
| client_secret='test-client-secret', |
| callback_uri='http://my.domain.com/callback', |
| ) |
| r = self.app.post(*oauth1_webtest('/rest/oauth/request_token', oauth_params, method='POST')) |
| rtok = parse_qs(r.text)['oauth_token'][0] |
| rsecr = parse_qs(r.text)['oauth_token_secret'][0] |
| assert rtok |
| assert rsecr |
| r = self.app.post('/rest/oauth/authorize', |
| params={'oauth_token': rtok}) |
| r = r.forms[0].submit('yes') |
| assert r.location.startswith('http://my.domain.com/callback') |
| pin = parse_qs(urlparse(r.location).query)['oauth_verifier'][0] |
| assert pin |
| |
| oauth_params = dict( |
| client_key='api_key_api_key_12345', |
| client_secret='test-client-secret', |
| resource_owner_key=rtok, |
| resource_owner_secret=rsecr, |
| verifier=pin, |
| ) |
| r = self.app.get(*oauth1_webtest('/rest/oauth/access_token', oauth_params)) |
| atok = parse_qs(r.text) |
| assert_equal(len(atok['oauth_token']), 1) |
| assert_equal(len(atok['oauth_token_secret']), 1) |
| |
| # now use the tokens & secrets to make a full OAuth request: |
| oauth_token = atok['oauth_token'][0] |
| oauth_secret = atok['oauth_token_secret'][0] |
| oaurl, oaparams, oahdrs, oaextraenv = oauth1_webtest('/rest/p/test/', dict( |
| client_key='api_key_api_key_12345', |
| client_secret='test-client-secret', |
| resource_owner_key=oauth_token, |
| resource_owner_secret=oauth_secret, |
| signature_type='query' |
| )) |
| resp = self.app.get(oaurl, oaparams, oahdrs, oaextraenv, status=200) |
| for tool in resp.json['tools']: |
| if tool['name'] == 'admin': |
| break # good, found Admin |
| else: |
| raise AssertionError(f"No 'admin' tool in response, maybe authorizing as correct user failed. {resp.json}") |
| |
| # definitely bad request |
| self.app.get(oaurl.replace('oauth_signature=', 'removed='), oaparams, oahdrs, oaextraenv, status=401) |
| |
| def test_authorize_ok(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key_reqtok_12345', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/authorize', params={'oauth_token': 'api_key_reqtok_12345'}) |
| assert_in('ctok_desc', r.text) |
| assert_in('api_key_reqtok_12345', r.text) |
| |
| def test_authorize_invalid(self): |
| resp = self.app.post('/rest/oauth/authorize', params={'oauth_token': 'api_key_reqtok_12345'}, status=400) |
| resp.mustcontain('error=invalid_client') |
| |
| def test_do_authorize_no(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key_reqtok_12345', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.post('/rest/oauth/do_authorize', |
| params={'no': '1', 'oauth_token': 'api_key_reqtok_12345'}) |
| assert_is_none(M.OAuthRequestToken.query.get(api_key='api_key_reqtok_12345')) |
| |
| def test_do_authorize_oob(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key_reqtok_12345', |
| consumer_token_id=ctok._id, |
| callback='oob', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key_reqtok_12345'}) |
| assert_is_not_none(r.html.find(text=re.compile('^PIN: '))) |
| |
| def test_do_authorize_cb(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key_reqtok_12345', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key_reqtok_12345'}) |
| assert r.location.startswith('http://my.domain.com/callback?oauth_token=api_key_reqtok_12345&oauth_verifier=') |
| |
| def test_do_authorize_cb_params(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key_reqtok_12345', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key_reqtok_12345'}) |
| assert r.location.startswith('http://my.domain.com/callback?myparam=foo&oauth_token=api_key_reqtok_12345&oauth_verifier=') |
| |
| |
| class TestOAuthRequestToken(TestController): |
| |
| oauth_params = dict( |
| client_key='api_key_api_key_12345', |
| client_secret='test-client-secret', |
| ) |
| |
| def setUp(self): |
| super().setUp() |
| dummy_oauths() |
| |
| def test_request_token_valid(self): |
| user = M.User.by_username('test-user') |
| consumer_token = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| secret_key='test-client-secret', |
| user_id=user._id, |
| ) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post(*oauth1_webtest('/rest/oauth/request_token', self.oauth_params, method='POST')) |
| r.mustcontain('oauth_token=') |
| r.mustcontain('oauth_token_secret=') |
| request_token = M.OAuthRequestToken.query.get(consumer_token_id=consumer_token._id) |
| assert_is_not_none(request_token) |
| |
| def test_request_token_no_consumer_token_matching(self): |
| self.app.post(*oauth1_webtest('/rest/oauth/request_token', self.oauth_params), status=401) |
| |
| def test_request_token_no_consumer_token_given(self): |
| oauth_params = self.oauth_params.copy() |
| oauth_params['signature_type'] = 'query' # so we can more easily remove a param next |
| url, params, hdrs, extraenv = oauth1_webtest('/rest/oauth/request_token', oauth_params) |
| url = url.replace('oauth_consumer_key', 'gone') |
| resp = self.app.post(url, params, hdrs, extraenv, status=400) |
| resp.mustcontain('error_description=Missing+mandatory+OAuth+parameters') |
| |
| def test_request_token_invalid(self): |
| user = M.User.by_username('test-user') |
| M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| secret_key='test-client-secret--INVALID', |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.post(*oauth1_webtest('/rest/oauth/request_token', self.oauth_params, method='POST'), |
| status=401) |
| |
| |
| class TestOAuthAccessToken(TestController): |
| |
| oauth_params = dict( |
| client_key='api_key_api_key_12345', |
| client_secret='test-client-secret', |
| resource_owner_key='api_key_reqtok_12345', |
| resource_owner_secret='test-token-secret', |
| verifier='good_verifier_123456', |
| ) |
| |
| def setUp(self): |
| super().setUp() |
| dummy_oauths() |
| |
| def test_access_token_no_consumer(self): |
| self.app.get(*oauth1_webtest('/rest/oauth/access_token', self.oauth_params), status=401) |
| |
| def test_access_token_no_request(self): |
| user = M.User.by_username('test-admin') |
| M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.get(*oauth1_webtest('/rest/oauth/access_token', self.oauth_params), status=401) |
| |
| def test_access_token_bad_pin(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key_reqtok_12345', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good_verifier_123456', |
| ) |
| ThreadLocalORMSession.flush_all() |
| oauth_params = self.oauth_params.copy() |
| oauth_params['verifier'] = 'bad_verifier_1234567' |
| self.app.get(*oauth1_webtest('/rest/oauth/access_token', oauth_params), |
| status=401) |
| |
| def test_access_token_bad_sig(self): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| user_id=user._id, |
| description='ctok_desc', |
| secret_key='test-client-secret', |
| ) |
| M.OAuthRequestToken( |
| api_key='api_key_reqtok_12345', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good_verifier_123456', |
| secret_key='test-token-secret--INVALID', |
| ) |
| ThreadLocalORMSession.flush_all() |
| self.app.get(*oauth1_webtest('/rest/oauth/access_token', self.oauth_params), status=401) |
| |
| def test_access_token_ok(self, signature_type='auth_header'): |
| user = M.User.by_username('test-admin') |
| ctok = M.OAuthConsumerToken( |
| api_key='api_key_api_key_12345', |
| secret_key='test-client-secret', |
| user_id=user._id, |
| description='ctok_desc', |
| ) |
| req_tok = M.OAuthRequestToken( |
| api_key='api_key_reqtok_12345', |
| secret_key='test-token-secret', |
| consumer_token_id=ctok._id, |
| callback='http://my.domain.com/callback?myparam=foo', |
| user_id=user._id, |
| validation_pin='good_verifier_123456', |
| ) |
| ThreadLocalORMSession.flush_all() |
| |
| oauth_params = dict(self.oauth_params, signature_type=signature_type) |
| r = self.app.get(*oauth1_webtest('/rest/oauth/access_token', self.oauth_params)) |
| atok = parse_qs(r.text) |
| assert len(atok['oauth_token']) == 1 |
| assert len(atok['oauth_token_secret']) == 1 |
| |
| def test_access_token_ok_by_query(self): |
| self.test_access_token_ok(signature_type='query') |
| |
| |
| @with_nose_compatibility |
| class TestDisableAccount(TestController): |
| def test_not_authenticated(self): |
| r = self.app.get( |
| '/auth/disable/', |
| extra_environ={'username': '*anonymous'}) |
| assert r.status_int == 302 |
| assert (r.location == |
| 'http://localhost/auth/?return_to=%2Fauth%2Fdisable%2F') |
| |
| def test_lists_user_projects(self): |
| r = self.app.get('/auth/disable/') |
| user = M.User.by_username('test-admin') |
| for p in user.my_projects_by_role_name('Admin'): |
| if p.name == 'u/test-admin': |
| continue |
| assert p.name in r |
| assert p.url() in r |
| |
| def test_has_asks_password(self): |
| r = self.app.get('/auth/disable/') |
| form = r.html.find('form', {'action': 'do_disable'}) |
| assert form is not None |
| |
| def test_bad_password(self): |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/disable/do_disable', {'password': 'bad', |
| '_session_id': self.app.cookies['_session_id'], }) |
| assert 'Invalid password' in r |
| user = M.User.by_username('test-admin') |
| assert user.disabled == False |
| |
| def test_disable(self): |
| self.app.get('/').follow() # establish session |
| r = self.app.post('/auth/disable/do_disable', {'password': 'foo', |
| '_session_id': self.app.cookies['_session_id'], }) |
| assert r.status_int == 302 |
| assert r.location == 'http://localhost/' |
| flash = json.loads(self.webflash(r)) |
| assert flash['status'] == 'ok' |
| assert flash['message'] == 'Your account was successfully disabled!' |
| user = M.User.by_username('test-admin') |
| assert user.disabled == True |
| |
| |
| @with_nose_compatibility |
| class TestPasswordExpire(TestController): |
| def login(self, username='test-user', pwd='foo', query_string=''): |
| extra = {'username': '*anonymous', 'REMOTE_ADDR': '127.0.0.1'} |
| r = self.app.get('/auth/' + query_string, extra_environ=extra) |
| |
| f = r.forms[0] |
| encoded = self.app.antispam_field_names(f) |
| f[encoded['username']] = username |
| f[encoded['password']] = pwd |
| return f.submit(extra_environ={'username': '*anonymous'}) |
| |
| def assert_redirects(self, where='/'): |
| resp = self.app.get(where, extra_environ={'username': 'test-user'}, status=302) |
| assert resp.location == 'http://localhost/auth/pwd_expired?' + urlencode({'return_to': where}) |
| |
| def assert_not_redirects(self, where='/neighborhood'): |
| self.app.get(where, extra_environ={'username': 'test-user'}, status=200) |
| |
| def test_disabled(self): |
| r = self.login() |
| assert not r.session.get('pwd-expired') |
| self.assert_not_redirects() |
| |
| def expired(self, r): |
| return r.session.get('pwd-expired') |
| |
| def set_expire_for_user(self, username='test-user', days=100): |
| user = M.User.by_username(username) |
| user.last_password_updated = datetime.utcnow() - timedelta(days=days) |
| session(user).flush(user) |
| return user |
| |
| def test_days(self): |
| self.set_expire_for_user() |
| |
| with h.push_config(config, **{'auth.pwdexpire.days': 180}): |
| r = self.login() |
| assert not self.expired(r) |
| self.assert_not_redirects() |
| |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert self.expired(r) |
| self.assert_redirects() |
| |
| def test_before(self): |
| self.set_expire_for_user() |
| |
| before = datetime.utcnow() - timedelta(days=180) |
| before = calendar.timegm(before.timetuple()) |
| with h.push_config(config, **{'auth.pwdexpire.before': before}): |
| r = self.login() |
| assert not self.expired(r) |
| self.assert_not_redirects() |
| |
| before = datetime.utcnow() - timedelta(days=90) |
| before = calendar.timegm(before.timetuple()) |
| with h.push_config(config, **{'auth.pwdexpire.before': before}): |
| r = self.login() |
| assert self.expired(r) |
| self.assert_redirects() |
| |
| def test_logout(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert self.expired(r) |
| self.assert_redirects() |
| r = self.app.get('/auth/logout', extra_environ={'username': 'test-user'}) |
| assert not self.expired(r) |
| self.assert_not_redirects() |
| |
| def test_change_pwd(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert self.expired(r) |
| self.assert_redirects() |
| |
| user = M.User.by_username('test-user') |
| old_update_time = user.last_password_updated |
| old_password = user.password |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = 'foo' |
| f['pw'] = 'qwerty' |
| f['pw2'] = 'qwerty' |
| r = f.submit(extra_environ={'username': 'test-user'}, status=302) |
| assert r.location == 'http://localhost/' |
| assert not self.expired(r) |
| user = M.User.by_username('test-user') |
| assert user.last_password_updated > old_update_time |
| assert user.password != old_password |
| |
| # Can log in with new password and change isn't required anymore |
| r = self.login(pwd='qwerty').follow() |
| assert r.location == 'http://localhost/dashboard' |
| assert 'Invalid login' not in r |
| assert not self.expired(r) |
| self.assert_not_redirects() |
| |
| # and can't log in with old password |
| r = self.login(pwd='foo') |
| assert 'Invalid login' in r |
| |
| def test_expired_pwd_change_invalidates_token(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert self.expired(r) |
| self.assert_redirects() |
| user = M.User.by_username('test-user') |
| user.set_tool_data('AuthPasswordReset', |
| hash="generated_hash_value", |
| hash_expiry="04-08-2020") |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| assert hash == 'generated_hash_value' |
| assert hash_expiry == '04-08-2020' |
| session(user).flush(user) |
| |
| # Change expired password |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = 'foo' |
| f['pw'] = 'qwerty' |
| f['pw2'] = 'qwerty' |
| r = f.submit(extra_environ={'username': 'test-user'}, status=302) |
| assert r.location == 'http://localhost/' |
| |
| user = M.User.by_username('test-user') |
| hash = user.get_tool_data('AuthPasswordReset', 'hash') |
| hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry') |
| |
| assert hash == '' |
| assert hash_expiry == '' |
| |
| def check_validation(self, oldpw, pw, pw2): |
| user = M.User.by_username('test-user') |
| old_update_time = user.last_password_updated |
| old_password = user.password |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = oldpw |
| f['pw'] = pw |
| f['pw2'] = pw2 |
| r = f.submit(extra_environ={'username': 'test-user'}) |
| assert self.expired(r) |
| user = M.User.by_username('test-user') |
| assert user.last_password_updated == old_update_time |
| assert user.password == old_password |
| return r |
| |
| def test_change_pwd_validation(self): |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login() |
| assert self.expired(r) |
| self.assert_redirects() |
| |
| r = self.check_validation('', '', '') |
| assert 'Please enter a value' in r |
| r = self.check_validation('', 'qwe', 'qwerty') |
| assert 'Enter a value 6 characters long or more' in r |
| r = self.check_validation('bad', 'qwerty1', 'qwerty') |
| assert 'Passwords must match' in r |
| r = self.check_validation('bad', 'qwerty', 'qwerty') |
| assert 'Incorrect password' in self.webflash(r) |
| assert r.location == 'http://localhost/auth/pwd_expired?return_to=' |
| |
| with h.push_config(config, **{'auth.min_password_len': 3}): |
| r = self.check_validation('foo', 'foo', 'foo') |
| assert 'Your old and new password should not be the same' in r |
| |
| def test_return_to(self): |
| return_to = '/p/test/tickets/?milestone=1.0&page=2' |
| self.set_expire_for_user() |
| with h.push_config(config, **{'auth.pwdexpire.days': 90}): |
| r = self.login(query_string='?' + urlencode({'return_to': return_to})) |
| # don't go to the return_to yet |
| assert r.location == 'http://localhost/auth/pwd_expired?' + urlencode({'return_to': return_to}) |
| |
| # but if user tries to go directly there anyway, intercept and redirect back |
| self.assert_redirects(where=return_to) |
| |
| r = self.app.get('/auth/pwd_expired', extra_environ={'username': 'test-user'}) |
| f = r.forms[0] |
| f['oldpw'] = 'foo' |
| f['pw'] = 'qwerty' |
| f['pw2'] = 'qwerty' |
| f['return_to'] = return_to |
| r = f.submit(extra_environ={'username': 'test-user'}, status=302) |
| assert r.location == 'http://localhost/p/test/tickets/?milestone=1.0&page=2' |
| |
| |
| @with_nose_compatibility |
| class TestCSRFProtection(TestController): |
| def test_blocks_invalid(self): |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True', 'REMOTE_ADDR': '127.0.0.1'} |
| |
| # regular login |
| r = self.app.get('/auth/') |
| |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-admin', password='foo', |
| _session_id=self.app.cookies['_session_id']), |
| antispam=True) |
| |
| # regular form submit |
| r = self.app.get('/admin/overview') |
| r = r.form.submit() |
| assert r.location == 'http://localhost/admin/overview' |
| |
| # invalid form submit |
| r = self.app.get('/admin/overview') |
| r.form['_session_id'] = 'bogus' |
| r = r.form.submit() |
| assert r.location == 'http://localhost/auth/' |
| |
| def test_blocks_invalid_on_login(self): |
| r = self.app.get('/auth/') |
| r.form['_session_id'] = 'bogus' |
| r.form.submit(status=403) |
| |
| def test_token_present_on_first_request(self): |
| r = self.app.get('/auth/') |
| assert r.form['_session_id'].value |
| |
| |
| @with_nose_compatibility |
| class TestTwoFactor(TestController): |
| |
| sample_key = b'\x00K\xda\xbfv\xc2B\xaa\x1a\xbe\xa5\x96b\xb2\xa0Z:\xc9\xcf\x8a' |
| sample_b32 = 'ABF5VP3WYJBKUGV6UWLGFMVALI5MTT4K' |
| |
| def _init_totp(self, username='test-admin'): |
| user = M.User.query.get(username=username) |
| totp_srv = TotpService().get() |
| totp_srv.set_secret_key(user, self.sample_key) |
| user.set_pref('multifactor', True) |
| |
| def test_settings_on(self): |
| r = self.app.get('/auth/preferences/') |
| assert r.html.find(attrs={'class': 'preferences multifactor'}) |
| |
| def test_settings_off(self): |
| with h.push_config(config, **{'auth.multifactor.totp': 'false'}): |
| r = self.app.get('/auth/preferences/') |
| assert not r.html.find(attrs={'class': 'preferences multifactor'}) |
| |
| for url in ['/auth/preferences/totp_new', |
| '/auth/preferences/totp_view', |
| '/auth/preferences/totp_set', |
| '/auth/preferences/totp_send_link', |
| '/auth/preferences/multifactor_disable', |
| '/auth/preferences/multifactor_recovery', |
| '/auth/preferences/multifactor_recovery_regen', |
| '/auth/multifactor', |
| '/auth/do_multifactor', |
| ]: |
| self.app.post(url, |
| {'password': 'foo', '_session_id': self.app.cookies['_session_id']}, |
| status=404) |
| |
| def test_user_disabled(self): |
| r = self.app.get('/auth/preferences/') |
| info_html = str(r.html.find(attrs={'class': 'preferences multifactor'})) |
| assert 'disabled' in info_html |
| |
| def test_user_enabled(self): |
| self._init_totp() |
| r = self.app.get('/auth/preferences/') |
| info_html = str(r.html.find(attrs={'class': 'preferences multifactor'})) |
| assert 'enabled' in info_html |
| |
| def test_reconfirm_auth(self): |
| from datetime import datetime as real_datetime |
| with patch('allura.lib.decorators.datetime') as datetime: |
| datetime.min = real_datetime.min |
| |
| # reconfirm required at first |
| datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 0, 0) |
| r = self.app.get('/auth/preferences/totp_new') |
| assert 'Password Confirmation' in r |
| |
| # submit form, and its not required |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| assert 'Password Confirmation' not in r |
| |
| # still not required |
| datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 1, 45) |
| r = self.app.get('/auth/preferences/totp_new') |
| assert 'Password Confirmation' not in r |
| |
| # required later |
| datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 2, 3) |
| r = self.app.get('/auth/preferences/totp_new') |
| assert 'Password Confirmation' in r |
| |
| def test_enable_totp(self): |
| # create a separate session, for later use in the test |
| other_session = TestController() |
| other_session.setup_method(method) |
| other_session.app.get('/auth/preferences/') |
| |
| with out_audits(user=True): |
| r = self.app.get('/auth/preferences/totp_new') |
| assert 'Password Confirmation' in r |
| |
| with audits('Visited multifactor new TOTP page', user=True): |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| assert 'Scan this' in r |
| assert 'Or enter setup key: ' in r |
| |
| first_key_shown = r.session['totp_new_key'] |
| |
| with audits(r'Failed to set up multifactor TOTP \(wrong code\)', user=True): |
| form = r.forms['totp_set'] |
| form['code'] = '' |
| r = form.submit() |
| assert 'Invalid' in r |
| assert f'Or enter setup key: {b32encode(first_key_shown).decode()}' in r |
| assert first_key_shown == r.session['totp_new_key'] # different keys on each pageload would be bad! |
| |
| new_totp = TotpService().Totp(r.session['totp_new_key']) |
| code = new_totp.generate(time_time()) |
| form = r.forms['totp_set'] |
| form['code'] = code |
| with audits('Set up multifactor TOTP', user=True): |
| r = form.submit() |
| assert 'Two factor authentication has now been set up.' == json.loads(self.webflash(r))['message'], self.webflash(r) |
| |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert len(tasks) == 1 |
| assert tasks[0].kwargs['subject'] == 'Two-Factor Authentication Enabled' |
| assert 'new two-factor authentication' in tasks[0].kwargs['text'] |
| |
| r = r.follow() |
| assert 'Recovery Codes' in r |
| |
| # Confirm any pre-existing sessions have to re-authenticate |
| r = other_session.app.get('/auth/preferences/') |
| assert '/auth/?return_to' in r.headers['Location'] |
| other_session.teardown_method(method) |
| |
| def test_reset_totp(self): |
| self._init_totp() |
| |
| # access page |
| r = self.app.get('/auth/preferences/totp_new') |
| assert 'Password Confirmation' in r |
| |
| # reconfirm password to get to it |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| |
| # confirm warning message, and key is not changed yet |
| assert 'Scan this' in r |
| assert 'Or enter setup key: ' in r |
| assert 'this will invalidate your previous' in r |
| current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin')) |
| assert self.sample_key == current_key |
| |
| # incorrect submission |
| form = r.forms['totp_set'] |
| form['code'] = '' |
| r = form.submit() |
| assert 'Invalid' in r |
| |
| # still unchanged key |
| current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin')) |
| assert self.sample_key == current_key |
| |
| # valid submission |
| new_key = r.session['totp_new_key'] |
| new_totp = TotpService().Totp(new_key) |
| code = new_totp.generate(time_time()) |
| form = r.forms['totp_set'] |
| form['code'] = code |
| r = form.submit() |
| assert 'Two factor authentication has now been set up.' == json.loads(self.webflash(r))['message'], self.webflash(r) |
| |
| # new key in place |
| current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin')) |
| assert new_key == current_key |
| assert self.sample_key != current_key |
| |
| def test_disable(self): |
| self._init_totp() |
| |
| self.app.get('/auth/preferences/multifactor_disable', status=405) # GET not allowed |
| |
| # get form and submit |
| r = self.app.get('/auth/preferences/') |
| form = r.forms['multifactor_disable'] |
| r = form.submit() |
| |
| # confirm first, no change |
| assert 'Password Confirmation' in r |
| user = M.User.query.get(username='test-admin') |
| assert user.get_pref('multifactor') == True |
| |
| # confirm submit, everything goes off |
| r.form['password'] = 'foo' |
| with audits('Disabled multifactor TOTP', user=True): |
| r = r.form.submit() |
| assert 'Multifactor authentication has now been disabled.' == json.loads(self.webflash(r))['message'], self.webflash(r) |
| user = M.User.query.get(username='test-admin') |
| assert user.get_pref('multifactor') == False |
| assert TotpService().get().get_secret_key(user) == None |
| assert RecoveryCodeService().get().get_codes(user) == [] |
| |
| # email confirmation |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert len(tasks) == 1 |
| assert tasks[0].kwargs['subject'] == 'Two-Factor Authentication Disabled' |
| assert 'disabled two-factor authentication' in tasks[0].kwargs['text'] |
| |
| def test_login_totp(self): |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/?return_to=/p/foo') |
| encoded = self.app.antispam_field_names(r.form) |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| with audits('Multifactor login - password ok, code not entered yet', user=True): |
| r = r.form.submit() |
| |
| # check results |
| assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r |
| r = r.follow() |
| assert not r.session.get('username') |
| |
| # try an invalid code |
| r.form['code'] = 'invalid-code' |
| with audits('Multifactor login - invalid code', user=True): |
| r = r.form.submit() |
| assert 'Invalid code' in r |
| assert not r.session.get('username') |
| |
| # use a valid code |
| totp = TotpService().Totp(self.sample_key) |
| code = totp.generate(time_time()) |
| r.form['code'] = code |
| with audits('Successful login', user=True): |
| r = r.form.submit() |
| |
| # confirm login and final page |
| assert r.session['username'] == 'test-admin' |
| assert r.location.endswith('/p/foo'), r |
| |
| def test_login_rate_limit(self): |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/?return_to=/p/foo') |
| encoded = self.app.antispam_field_names(r.form) |
| |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| r = r.form.submit() |
| r = r.follow() |
| |
| # try some invalid codes |
| for i in range(3): |
| r.form['code'] = 'invalid-code' |
| r = r.form.submit() |
| assert 'Invalid code' in r |
| |
| # use a valid code, but it'll hit rate limit |
| totp = TotpService().Totp(self.sample_key) |
| code = totp.generate(time_time()) |
| r.form['code'] = code |
| with audits('Multifactor login - rate limit', user=True): |
| r = r.form.submit() |
| |
| assert 'rate limit exceeded' in r |
| assert not r.session.get('username') |
| |
| def test_login_totp_disrupted(self): |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/') |
| encoded = self.app.antispam_field_names(r.form) |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| r = r.form.submit() |
| r = r.follow() |
| |
| # go to some other page instead of filling out the 2FA code |
| other_r = self.app.get('/') |
| |
| # then try to complete the 2FA form |
| totp = TotpService().Totp(self.sample_key) |
| code = totp.generate(time_time()) |
| r.form['code'] = code |
| r = r.form.submit() |
| |
| # sent back to regular login |
| assert ('Your multifactor login was disrupted, please start over.' == |
| json.loads(self.webflash(r))['message']), self.webflash(r) |
| r = r.follow() |
| assert 'Password Login' in r |
| |
| def test_login_recovery_code(self): |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/?return_to=/p/foo') |
| encoded = self.app.antispam_field_names(r.form) |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| r = r.form.submit() |
| |
| # check results |
| assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r |
| r = r.follow() |
| assert not r.session.get('username') |
| |
| # change login mode |
| r.form['mode'] = 'recovery' |
| |
| # try an invalid code |
| r.form['code'] = 'invalid-code' |
| r = r.form.submit() |
| assert 'Invalid code' in r |
| assert not r.session.get('username') |
| |
| # use a valid code |
| user = M.User.by_username('test-admin') |
| recovery = RecoveryCodeService().get() |
| recovery.regenerate_codes(user) |
| recovery_code = recovery.get_codes(user)[0] |
| r.form['code'] = recovery_code |
| with audits('Logged in using a multifactor recovery code', user=True): |
| r = r.form.submit() |
| |
| # confirm login and final page |
| assert r.session['username'] == 'test-admin' |
| assert r.location.endswith('/p/foo'), r |
| |
| # confirm code used up |
| assert recovery_code not in RecoveryCodeService().get().get_codes(user) |
| |
| @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)) |
| def test_login_totp_with_hibp(self): |
| # this is essentially the same as regular TOTP test, just making sure that HIBP doesn't get in the way |
| # or cause any problems. It shouldn't even run since a password isn't present when the final login happens |
| |
| self._init_totp() |
| |
| # so test-admin isn't automatically logged in for all requests |
| self.app.extra_environ = {'disable_auth_magic': 'True'} |
| |
| # regular login |
| r = self.app.get('/auth/?return_to=/p/foo') |
| encoded = self.app.antispam_field_names(r.form) |
| r.form[encoded['username']] = 'test-admin' |
| r.form[encoded['password']] = 'foo' |
| with audits('Multifactor login - password ok, code not entered yet', user=True): |
| r = r.form.submit() |
| |
| # check results |
| assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r |
| r = r.follow() |
| assert not r.session.get('username') |
| |
| # use a valid code |
| totp = TotpService().Totp(self.sample_key) |
| code = totp.generate(time_time()) |
| r.form['code'] = code |
| with audits('Successful login', user=True): |
| r = r.form.submit() |
| |
| # confirm login and final page |
| assert r.session['username'] == 'test-admin' |
| assert r.location.endswith('/p/foo'), r |
| |
| def test_view_key(self): |
| self._init_totp() |
| |
| with out_audits(user=True): |
| r = self.app.get('/auth/preferences/totp_view') |
| assert 'Password Confirmation' in r |
| |
| with audits('Viewed multifactor TOTP config page', user=True): |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| assert 'Scan this' in r |
| assert f'Or enter setup key: {self.sample_b32}' in r |
| |
| def test_view_recovery_codes_and_regen(self): |
| self._init_totp() |
| |
| # reconfirm password |
| with out_audits(user=True): |
| r = self.app.get('/auth/preferences/multifactor_recovery') |
| assert 'Password Confirmation' in r |
| |
| # actual visit |
| with audits('Viewed multifactor recovery codes', user=True): |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| assert 'Download' in r |
| assert 'Print' in r |
| |
| # regenerate codes |
| with audits('Regenerated multifactor recovery codes', user=True): |
| r = r.forms['multifactor_recovery_regen'].submit() |
| |
| # email confirmation |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert len(tasks) == 1 |
| assert tasks[0].kwargs['subject'] == 'Two-Factor Recovery Codes Regenerated' |
| assert 'regenerated' in tasks[0].kwargs['text'] |
| |
| def test_send_links(self): |
| r = self.app.get('/auth/preferences/totp_new') |
| r.form['password'] = 'foo' |
| r = r.form.submit() |
| |
| r = r.forms['totp_send_link'].submit() |
| |
| tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all() |
| assert len(tasks) == 1 |
| assert tasks[0].kwargs['subject'] == 'Two-Factor Authentication Apps' |
| assert 'itunes.apple.com' in tasks[0].kwargs['text'] |
| assert 'play.google.com' in tasks[0].kwargs['text'] |