| import json |
| |
| from datadiff.tools import assert_equal |
| from pylons import c |
| from allura.tests import TestController |
| from allura.tests import decorators as td |
| from allura import model as M |
| from ming.orm.ormsession import ThreadLocalORMSession |
| |
| |
| def unentity(s): |
| return s.replace('"', '"') |
| |
| class TestAuth(TestController): |
| |
| def test_login(self): |
| result = self.app.get('/auth/') |
| r = self.app.post('/auth/send_verification_link', params=dict(a='test@example.com')) |
| email = M.User.query.get(username='test-admin').email_addresses[0] |
| r = self.app.post('/auth/send_verification_link', params=dict(a=email)) |
| ThreadLocalORMSession.flush_all() |
| r = self.app.get('/auth/verify_addr', params=dict(a='foo')) |
| assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r) |
| ea = M.EmailAddress.query.find().first() |
| r = self.app.get('/auth/verify_addr', params=dict(a=ea.nonce)) |
| assert json.loads(self.webflash(r))['status'] == 'ok', self.webflash(r) |
| r = self.app.get('/auth/logout') |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='foo')) |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-user', password='food')) |
| assert 'Invalid login' in str(r), r.showbrowser() |
| r = self.app.post('/auth/do_login', params=dict( |
| username='test-usera', password='foo')) |
| assert 'Invalid login' in str(r), r.showbrowser() |
| |
| @td.with_user_project('test-admin') |
| def test_prefs(self): |
| r = self.app.get('/auth/prefs/', extra_environ=dict(username='test-admin')) |
| assert 'test@example.com' not in r |
| subscriptions = M.Mailbox.query.find(dict(user_id=c.user._id, is_flash=False)).all() |
| # make sure page actually lists all the user's subscriptions |
| assert len(subscriptions) > 0, 'Test user has no subscriptions, cannot verify that they are shown' |
| for m in subscriptions: |
| assert m._id in r, "Page doesn't list subscription for Mailbox._id = %s" % m._id |
| r = self.app.post('/auth/prefs/update', params={ |
| 'display_name':'Test Admin', |
| 'new_addr.addr':'test@example.com', |
| 'new_addr.claim':'Claim Address', |
| 'primary_addr':'test-admin@users.localhost', |
| 'preferences.email_format':'plain'}, |
| extra_environ=dict(username='test-admin')) |
| r = self.app.get('/auth/prefs/') |
| assert 'test@example.com' in r |
| r = self.app.post('/auth/prefs/update', params={ |
| 'display_name':'Test Admin', |
| 'addr-1.ord':'1', |
| 'addr-2.ord':'1', |
| 'addr-2.delete':'on', |
| 'new_addr.addr':'', |
| 'primary_addr':'test-admin@users.localhost', |
| 'preferences.email_format':'plain'}, |
| extra_environ=dict(username='test-admin')) |
| r = self.app.get('/auth/prefs/') |
| assert 'test@example.com' not in r |
| ea = M.EmailAddress.query.get(_id='test-admin@users.localhost') |
| ea.confirmed = True |
| ThreadLocalORMSession.flush_all() |
| r = self.app.post('/auth/prefs/update', params={ |
| 'display_name':'Test Admin', |
| 'new_addr.addr':'test-admin@users.localhost', |
| 'new_addr.claim':'Claim Address', |
| 'primary_addr':'test-admin@users.localhost', |
| 'preferences.email_format':'plain'}, |
| extra_environ=dict(username='test-admin')) |
| |
| def test_api_key(self): |
| r = self.app.get('/auth/prefs/') |
| assert 'No API token generated' in r |
| r = self.app.post('/auth/prefs/gen_api_token', status=302) |
| r = self.app.get('/auth/prefs/') |
| assert 'No API token generated' not in r |
| assert 'API Key:' in r |
| assert 'Secret Key:' in r |
| r = self.app.post('/auth/prefs/del_api_token', status=302) |
| r = self.app.get('/auth/prefs/') |
| assert 'No API token generated' in r |
| |
| def test_oauth(self): |
| r = self.app.get('/auth/oauth/') |
| r = self.app.post('/auth/oauth/register', params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez'}).follow() |
| assert 'oautstapp' in r |
| r = self.app.post('/auth/oauth/delete').follow() |
| assert 'Invalid app ID' in r |
| |
| def test_openid(self): |
| result = self.app.get('/auth/login_verify_oid', params=dict( |
| provider='http://www.google.com/accounts/o8/id', username='rick446@usa.net')) |
| assert '<form' in result.body |
| result = self.app.get('/auth/login_verify_oid', params=dict( |
| provider='http://www.google.com/accounts/', username='rick446@usa.net'), |
| status=302) |
| assert json.loads(self.webflash(result))['status'] == 'error', self.webflash(result) |
| result = self.app.get('/auth/login_verify_oid', params=dict( |
| provider='', username='http://blog.pythonisito.com')) |
| assert result.status_int == 302 |
| r = self.app.get('/auth/setup_openid_user') |
| r = self.app.post('/auth/do_setup_openid_user', params=dict( |
| username='test-admin', display_name='Test Admin')) |
| r = self.app.post('/auth/do_setup_openid_user', params=dict( |
| username='test-user', display_name='Test User')) |
| r = self.app.post('/auth/do_setup_openid_user', params=dict( |
| username='test-admin', display_name='Test Admin')) |
| r = self.app.get('/auth/claim_oid') |
| result = self.app.get('/auth/claim_verify_oid', params=dict( |
| provider='http://www.google.com/accounts/o8/id', username='rick446@usa.net')) |
| assert '<form' in result.body |
| result = self.app.get('/auth/claim_verify_oid', params=dict( |
| provider='', username='http://blog.pythonisito.com')) |
| assert result.status_int == 302 |
| |
| def test_create_account(self): |
| r = self.app.get('/auth/create_account') |
| assert 'Create an Account' in r |
| r = self.app.post('/auth/save_new', params=dict(username='aaa',pw='123')) |
| assert 'Enter a value 8 characters long or more' in r |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me')) |
| r = r.follow() |
| assert 'User "Test Me" registered' in unentity(r.body) |
| r = self.app.post( |
| '/auth/save_new', |
| params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me')) |
| assert 'That username is already taken. Please choose another.' in r |
| r = self.app.get('/auth/logout') |
| r = self.app.post( |
| '/auth/do_login', |
| params=dict(username='aaa', password='12345678'), |
| status=302) |
| |
| def test_one_project_role(self): |
| """Make sure when a user goes to a new project only one project role is created. |
| There was an issue with extra project roles getting created if a user went directly to |
| an admin page.""" |
| p_nbhd = M.Neighborhood.query.get(name='Projects') |
| p = M.Project.query.get(shortname='test', neighborhood_id=p_nbhd._id) |
| self.app.post('/auth/save_new', params=dict( |
| username='aaa', |
| pw='12345678', |
| pw2='12345678', |
| display_name='Test Me')).follow() |
| user = M.User.query.get(username='aaa') |
| assert M.ProjectRole.query.find(dict(user_id=user._id, project_id=p._id)).count() == 0 |
| r = self.app.get('/p/test/admin/permissions',extra_environ=dict(username='aaa'), status=403) |
| assert M.ProjectRole.query.find(dict(user_id=user._id, project_id=p._id)).count() <= 1 |
| |
| def test_default_lookup(self): |
| # Make sure that default _lookup() throws 404 |
| self.app.get('/auth/foobar', status=404) |
| |
| @td.with_svn |
| def test_refresh_repo(self): |
| r = self.app.get('/auth/refresh_repo') |
| assert_equal(r.body, 'No repo specified') |
| |
| r = self.app.get('/auth/refresh_repo/p/gbalksdfh') |
| assert_equal(r.body, 'No project at /p/gbalksdfh') |
| |
| r = self.app.get('/auth/refresh_repo/p/test') |
| assert_equal(r.body, '/p/test does not include a repo mount point') |
| |
| r = self.app.get('/auth/refresh_repo/p/test/blah/') |
| assert_equal(r.body, 'Cannot find repo at /p/test/blah') |
| |
| r = self.app.get('/auth/refresh_repo/p/test/src/') |
| assert_equal(r.body, '<Repository /tmp/svn/p/test/src> refresh queued.\n') |
| |
| class TestUserPermissions(TestController): |
| allow = dict(allow_read=True, allow_write=True, allow_create=True) |
| read = dict(allow_read=True, allow_write=False, allow_create=False) |
| disallow = dict(allow_read=False, allow_write=False, allow_create=False) |
| |
| def test_unknown_project(self): |
| r = self._check_repo('/git/foo/bar', status=404) |
| |
| def test_unknown_app(self): |
| r = self._check_repo('/git/test/bar') |
| assert r == self.disallow, r |
| |
| @td.with_svn |
| def test_repo_write(self): |
| r = self._check_repo('/git/test/src.git') |
| assert r == self.allow, r |
| r = self._check_repo('/git/test/src') |
| assert r == self.allow, r |
| |
| @td.with_svn |
| def test_subdir(self): |
| r = self._check_repo('/git/test/src.git/foo') |
| assert r == self.allow, r |
| r = self._check_repo('/git/test/src/foo') |
| assert r == self.allow, r |
| |
| @td.with_svn |
| def test_neighborhood(self): |
| r = self._check_repo('/git/test.p/src.git') |
| assert r == self.allow, r |
| |
| @td.with_svn |
| def test_repo_read(self): |
| r = self._check_repo( |
| '/git/test.p/src.git', |
| username='test-user') |
| assert r == self.read, r |
| |
| def test_unknown_user(self): |
| r = self._check_repo( |
| '/git/test.p/src.git', |
| username='test-usera', |
| status=404) |
| |
| def _check_repo(self, path, username='test-admin', **kw): |
| url = '/auth/repo_permissions' |
| r = self.app.get(url, params=dict( |
| repo_path=path, |
| username=username), **kw) |
| try: |
| return r.json |
| except: |
| return r |
| |
| @td.with_repos |
| def test_list_repos(self): |
| r = self.app.get('/auth/repo_permissions', params=dict(username='test-admin'), status=200) |
| assert_equal(json.loads(r.body), {"allow_write": [ |
| '/git/test/src-git', |
| '/hg/test/src-hg', |
| '/svn/test/src', |
| ]}) |