blob: 0f03acb9ea4e92d0e39e3bb13dd05230aa011420 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Model tests for auth
"""
from nose.tools import (
with_setup,
assert_equal,
assert_not_equal,
assert_true,
assert_not_in,
assert_in,
)
from tg import tmpl_context as c, app_globals as g, request
from webob import Request
from mock import patch, Mock
from datetime import datetime, timedelta
from ming.orm.ormsession import ThreadLocalORMSession
from ming.odm import session
from allura import model as M
from allura.lib import helpers as h
from allura.lib import plugin
from allura.tests import decorators as td
from alluratest.controller import setup_basic_test, setup_global_objects, setup_functional_test
def setUp():
setup_basic_test()
ThreadLocalORMSession.close_all()
setup_global_objects()
@with_setup(setUp)
def test_email_address():
addr = M.EmailAddress(email='test_admin@domain.net',
claimed_by_user_id=c.user._id)
ThreadLocalORMSession.flush_all()
assert addr.claimed_by_user() == c.user
addr2 = M.EmailAddress.create('test@domain.net')
addr3 = M.EmailAddress.create('test_admin@domain.net')
# Duplicate emails are allowed, until the email is confirmed
assert addr3 is not addr
assert addr2 is not addr
assert addr2
addr4 = M.EmailAddress.create('test@DOMAIN.NET')
assert addr4 is not addr2
assert addr is c.user.address_object('test_admin@domain.net')
c.user.claim_address('test@DOMAIN.NET')
assert 'test@domain.net' in c.user.email_addresses
@with_setup(setUp)
def test_email_address_lookup_helpers():
addr = M.EmailAddress.create('TEST@DOMAIN.NET')
nobody = M.EmailAddress.create('nobody@example.com')
ThreadLocalORMSession.flush_all()
assert_equal(addr.email, 'TEST@domain.net')
assert_equal(M.EmailAddress.get(email='TEST@DOMAIN.NET'), addr)
assert_equal(M.EmailAddress.get(email='TEST@domain.net'), addr)
assert_equal(M.EmailAddress.get(email='test@domain.net'), None)
assert_equal(M.EmailAddress.get(email=None), None)
assert_equal(M.EmailAddress.get(email='nobody@example.com'), nobody)
# invalid email returns None, but not nobody@example.com as before
assert_equal(M.EmailAddress.get(email='invalid'), None)
assert_equal(M.EmailAddress.find(dict(email='TEST@DOMAIN.NET')).all(), [addr])
assert_equal(M.EmailAddress.find(dict(email='TEST@domain.net')).all(), [addr])
assert_equal(M.EmailAddress.find(dict(email='test@domain.net')).all(), [])
assert_equal(M.EmailAddress.find(dict(email=None)).all(), [])
assert_equal(M.EmailAddress.find(dict(email='nobody@example.com')).all(), [nobody])
# invalid email returns empty query, but not nobody@example.com as before
assert_equal(M.EmailAddress.find(dict(email='invalid')).all(), [])
@with_setup(setUp)
def test_email_address_canonical():
assert_equal(M.EmailAddress.canonical('nobody@EXAMPLE.COM'),
'nobody@example.com')
assert_equal(M.EmailAddress.canonical('nobody@example.com'),
'nobody@example.com')
assert_equal(M.EmailAddress.canonical('I Am Nobody <nobody@example.com>'),
'nobody@example.com')
assert_equal(M.EmailAddress.canonical(' nobody@example.com\t'),
'nobody@example.com')
assert_equal(M.EmailAddress.canonical('I Am@Nobody <nobody@example.com> '),
'nobody@example.com')
assert_equal(M.EmailAddress.canonical(' No@body <no@body@example.com> '),
'no@body@example.com')
assert_equal(M.EmailAddress.canonical('no@body@example.com'),
'no@body@example.com')
assert_equal(M.EmailAddress.canonical('invalid'), None)
@with_setup(setUp)
def test_email_address_send_verification_link():
addr = M.EmailAddress(email='test_admin@domain.net',
claimed_by_user_id=c.user._id)
addr.send_verification_link()
with patch('allura.tasks.mail_tasks.smtp_client._client') as _client:
M.MonQTask.run_ready()
return_path, rcpts, body = _client.sendmail.call_args[0]
assert_equal(rcpts, ['test_admin@domain.net'])
@td.with_user_project('test-admin')
@with_setup(setUp)
def test_user():
assert c.user.url() .endswith('/u/test-admin/')
assert c.user.script_name .endswith('/u/test-admin/')
assert_equal(set(p.shortname for p in c.user.my_projects()),
set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
# delete one of the projects and make sure it won't appear in my_projects()
p = M.Project.query.get(shortname='test2')
p.deleted = True
ThreadLocalORMSession.flush_all()
assert_equal(set(p.shortname for p in c.user.my_projects()),
set(['test', 'u/test-admin', 'adobe-1', '--init--']))
u = M.User.register(dict(
username='nosetest-user'))
ThreadLocalORMSession.flush_all()
assert_equal(u.private_project().shortname, 'u/nosetest-user')
roles = g.credentials.user_roles(
u._id, project_id=u.private_project().root_project._id)
assert len(roles) == 3, roles
u.set_password('foo')
provider = plugin.LocalAuthenticationProvider(Request.blank('/'))
assert provider._validate_password(u, 'foo')
assert not provider._validate_password(u, 'foobar')
u.set_password('foobar')
assert provider._validate_password(u, 'foobar')
assert not provider._validate_password(u, 'foo')
@with_setup(setUp)
def test_user_project_creates_on_demand():
u = M.User.register(dict(username='foobar123'), make_project=False)
ThreadLocalORMSession.flush_all()
assert not M.Project.query.get(shortname='u/foobar123')
assert u.private_project()
assert M.Project.query.get(shortname='u/foobar123')
@with_setup(setUp)
def test_user_project_already_deleted_creates_on_demand():
u = M.User.register(dict(username='foobar123'), make_project=True)
p = M.Project.query.get(shortname='u/foobar123')
p.deleted = True
ThreadLocalORMSession.flush_all()
assert not M.Project.query.get(shortname='u/foobar123', deleted=False)
assert u.private_project()
ThreadLocalORMSession.flush_all()
assert M.Project.query.get(shortname='u/foobar123', deleted=False)
@with_setup(setUp)
def test_user_project_does_not_create_on_demand_for_disabled_user():
u = M.User.register(
dict(username='foobar123', disabled=True), make_project=False)
ThreadLocalORMSession.flush_all()
assert not u.private_project()
assert not M.Project.query.get(shortname='u/foobar123')
@with_setup(setUp)
def test_user_project_does_not_create_on_demand_for_anonymous_user():
u = M.User.anonymous()
ThreadLocalORMSession.flush_all()
assert not u.private_project()
assert not M.Project.query.get(shortname='u/anonymous')
assert not M.Project.query.get(shortname='u/*anonymous')
@with_setup(setUp)
@patch('allura.model.auth.log')
def test_user_by_email_address(log):
u1 = M.User.register(dict(username='abc1'), make_project=False)
u2 = M.User.register(dict(username='abc2'), make_project=False)
addr1 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
claimed_by_user_id=u1._id)
addr2 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
claimed_by_user_id=u2._id)
# both users are disabled
u1.disabled, u2.disabled = True, True
ThreadLocalORMSession.flush_all()
assert_equal(M.User.by_email_address('abc123@abc.me'), None)
assert_equal(log.warn.call_count, 0)
# only u2 is active
u1.disabled, u2.disabled = True, False
ThreadLocalORMSession.flush_all()
assert_equal(M.User.by_email_address('abc123@abc.me'), u2)
assert_equal(log.warn.call_count, 0)
# both are active
u1.disabled, u2.disabled = False, False
ThreadLocalORMSession.flush_all()
assert_in(M.User.by_email_address('abc123@abc.me'), [u1, u2])
assert_equal(log.warn.call_count, 1)
# invalid email returns None, but not user which claimed
# nobody@example.com as before
nobody = M.EmailAddress(email='nobody@example.com', confirmed=True,
claimed_by_user_id=u1._id)
ThreadLocalORMSession.flush_all()
assert_equal(M.User.by_email_address('nobody@example.com'), u1)
assert_equal(M.User.by_email_address('invalid'), None)
@with_setup(setUp)
def test_project_role():
role = M.ProjectRole(project_id=c.project._id, name='test_role')
M.ProjectRole.by_user(c.user, upsert=True).roles.append(role._id)
ThreadLocalORMSession.flush_all()
roles = g.credentials.user_roles(
c.user._id, project_id=c.project.root_project._id)
roles_ids = [r['_id'] for r in roles]
roles = M.ProjectRole.query.find({'_id': {'$in': roles_ids}})
for pr in roles:
assert pr.display()
pr.special
assert pr.user in (c.user, None, M.User.anonymous())
@with_setup(setUp)
def test_default_project_roles():
roles = dict(
(pr.name, pr)
for pr in M.ProjectRole.query.find(dict(
project_id=c.project._id)).all()
if pr.name)
assert 'Admin' in roles.keys(), roles.keys()
assert 'Developer' in roles.keys(), roles.keys()
assert 'Member' in roles.keys(), roles.keys()
assert roles['Developer']._id in roles['Admin'].roles
assert roles['Member']._id in roles['Developer'].roles
# There're 1 user assigned to project, represented by
# relational (vs named) ProjectRole's
assert len(roles) == M.ProjectRole.query.find(dict(
project_id=c.project._id)).count() - 1
@with_setup(setUp)
def test_email_address_claimed_by_user():
addr = M.EmailAddress(email='test_admin@domain.net',
claimed_by_user_id=c.user._id)
c.user.disabled = True
ThreadLocalORMSession.flush_all()
assert addr.claimed_by_user() is None
@td.with_user_project('test-admin')
@with_setup(setUp)
def test_user_projects_by_role():
assert_equal(set(p.shortname for p in c.user.my_projects()),
set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
assert_equal(set(p.shortname for p in c.user.my_projects_by_role_name('Admin')),
set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
# Remove admin access from c.user to test2 project
project = M.Project.query.get(shortname='test2')
admin_role = M.ProjectRole.by_name('Admin', project)
developer_role = M.ProjectRole.by_name('Developer', project)
user_role = M.ProjectRole.by_user(c.user, project=project, upsert=True)
user_role.roles.remove(admin_role._id)
user_role.roles.append(developer_role._id)
ThreadLocalORMSession.flush_all()
g.credentials.clear()
assert_equal(set(p.shortname for p in c.user.my_projects()),
set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
assert_equal(set(p.shortname for p in c.user.my_projects_by_role_name('Admin')),
set(['test', 'u/test-admin', 'adobe-1', '--init--']))
@td.with_user_project('test-admin')
@with_setup(setUp)
def test_user_projects_unnamed():
"""
Confirm that spurious ProjectRoles associating a user with
a project to which they do not belong to any named group
don't cause the user to count as a member of the project.
"""
sub1 = M.Project.query.get(shortname='test/sub1')
M.ProjectRole(
user_id=c.user._id,
project_id=sub1._id)
ThreadLocalORMSession.flush_all()
project_names = [p.shortname for p in c.user.my_projects()]
assert_not_in('test/sub1', project_names)
assert_in('test', project_names)
@patch.object(g, 'user_message_max_messages', 3)
def test_check_sent_user_message_times():
user1 = M.User.register(dict(username='test-user'), make_project=False)
time1 = datetime.utcnow() - timedelta(minutes=30)
time2 = datetime.utcnow() - timedelta(minutes=45)
time3 = datetime.utcnow() - timedelta(minutes=70)
user1.sent_user_message_times = [time1, time2, time3]
assert user1.can_send_user_message()
assert_equal(len(user1.sent_user_message_times), 2)
user1.sent_user_message_times.append(
datetime.utcnow() - timedelta(minutes=15))
assert not user1.can_send_user_message()
@td.with_user_project('test-admin')
@with_setup(setUp)
def test_user_track_active():
# without this session flushing inside track_active raises Exception
setup_functional_test()
c.user = M.User.by_username('test-admin')
assert_equal(c.user.last_access['session_date'], None)
assert_equal(c.user.last_access['session_ip'], None)
assert_equal(c.user.last_access['session_ua'], None)
req = Mock(headers={'User-Agent': 'browser'}, remote_addr='addr')
c.user.track_active(req)
c.user = M.User.by_username(c.user.username)
assert_not_equal(c.user.last_access['session_date'], None)
assert_equal(c.user.last_access['session_ip'], 'addr')
assert_equal(c.user.last_access['session_ua'], 'browser')
# ensure that session activity tracked with a whole-day granularity
prev_date = c.user.last_access['session_date']
c.user.track_active(req)
c.user = M.User.by_username(c.user.username)
assert_equal(c.user.last_access['session_date'], prev_date)
yesterday = datetime.utcnow() - timedelta(1)
c.user.last_access['session_date'] = yesterday
session(c.user).flush(c.user)
c.user.track_active(req)
c.user = M.User.by_username(c.user.username)
assert_true(c.user.last_access['session_date'] > yesterday)
# ...or if IP or User Agent has changed
req.remote_addr = 'new addr'
c.user.track_active(req)
c.user = M.User.by_username(c.user.username)
assert_equal(c.user.last_access['session_ip'], 'new addr')
assert_equal(c.user.last_access['session_ua'], 'browser')
req.headers['User-Agent'] = 'new browser'
c.user.track_active(req)
c.user = M.User.by_username(c.user.username)
assert_equal(c.user.last_access['session_ip'], 'new addr')
assert_equal(c.user.last_access['session_ua'], 'new browser')
@with_setup(setUp)
def test_user_index():
c.user.email_addresses = ['email1', 'email2']
c.user.set_pref('email_address', 'email2')
idx = c.user.index()
assert_equal(idx['id'], c.user.index_id())
assert_equal(idx['title'], 'User test-admin')
assert_equal(idx['type_s'], 'User')
assert_equal(idx['username_s'], 'test-admin')
assert_equal(idx['email_addresses_t'], 'email1 email2')
assert_equal(idx['email_address_s'], 'email2')
assert_in('last_password_updated_dt', idx)
assert_equal(idx['disabled_b'], False)
assert_in('results_per_page_i', idx)
assert_in('email_format_s', idx)
assert_in('disable_user_messages_b', idx)
assert_equal(idx['display_name_t'], 'Test Admin')
assert_equal(idx['sex_s'], 'Unknown')
assert_in('birthdate_dt', idx)
assert_in('localization_s', idx)
assert_in('timezone_s', idx)
assert_in('socialnetworks_t', idx)
assert_in('telnumbers_t', idx)
assert_in('skypeaccount_s', idx)
assert_in('webpages_t', idx)
assert_in('skills_t', idx)
assert_in('last_access_login_date_dt', idx)
assert_in('last_access_login_ip_s', idx)
assert_in('last_access_login_ua_t', idx)
assert_in('last_access_session_date_dt', idx)
assert_in('last_access_session_ip_s', idx)
assert_in('last_access_session_ua_t', idx)
# provided bby auth provider
assert_in('user_registration_date_dt', idx)
@with_setup(setUp)
def test_user_index_none_values():
c.user.email_addresses = [None]
c.user.set_pref('telnumbers', [None])
c.user.set_pref('webpages', [None])
idx = c.user.index()
assert_equal(idx['email_addresses_t'], '')
assert_equal(idx['telnumbers_t'], '')
assert_equal(idx['webpages_t'], '')
@with_setup(setUp)
def test_user_backfill_login_details():
with h.push_config(request, user_agent='TestBrowser/55'):
# these shouldn't match
h.auditlog_user('something happened')
h.auditlog_user('blah blah Password changed')
with h.push_config(request, user_agent='TestBrowser/56'):
# these should all match, but only one entry created for this ip/ua
h.auditlog_user('Account activated')
h.auditlog_user('Successful login')
h.auditlog_user('Password changed')
with h.push_config(request, user_agent='TestBrowser/57'):
# this should match too
h.auditlog_user('Set up multifactor TOTP')
ThreadLocalORMSession.flush_all()
auth_provider = plugin.AuthenticationProvider.get(None)
c.user.backfill_login_details(auth_provider)
assert_equal(sorted(c.user.previous_login_details), [
{'ip': '127.0.0.1', 'ua': 'TestBrowser/56'},
{'ip': '127.0.0.1', 'ua': 'TestBrowser/57'},
])