# Licensed to the Apache Software Foundation (ASF) under one
#       or more contributor license agreements.  See the NOTICE file
#       distributed with this work for additional information
#       regarding copyright ownership.  The ASF licenses this file
#       to you under the Apache License, Version 2.0 (the
#       "License"); you may not use this file except in compliance
#       with the License.  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#       Unless required by applicable law or agreed to in writing,
#       software distributed under the License is distributed on an
#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#       KIND, either express or implied.  See the License for the
#       specific language governing permissions and limitations
#       under the License.

from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
import calendar
from datetime import datetime, time, timedelta
from time import time as time_time
import json
from six.moves.urllib.parse import urlparse, parse_qs
from six.moves.urllib.parse import urlencode

from bson import ObjectId
import re
from ming.orm.ormsession import ThreadLocalORMSession, session
from tg import config, expose
from mock import patch, Mock
import mock
from alluratest.tools import (
    assert_equal,
    assert_not_equal,
    assert_is_none,
    assert_is_not_none,
    assert_in,
    assert_not_in,
    assert_true,
    assert_false,
)
from tg import tmpl_context as c, app_globals as g
import oauth2

from allura.tests import TestController
from allura.tests import decorators as td
from allura.tests.decorators import audits, out_audits
from alluratest.controller import setup_trove_categories
from allura import model as M
from allura.lib import plugin
from allura.lib import helpers as h
from allura.lib.multifactor import TotpService, RecoveryCodeService
import six
from six.moves import range


def unentity(s):
    return s.replace('&quot;', '"').replace('&#34;', '"')


class TestAuth(TestController):
    def test_login(self):
        self.app.get('/auth/')
        r = self.app.post('/auth/send_verification_link', params=dict(a='test@example.com',
                                                                      _session_id=self.app.cookies['_session_id']))
        email = M.User.query.get(username='test-admin').email_addresses[0]
        r = self.app.post('/auth/send_verification_link', params=dict(a=email,
                                                                      _session_id=self.app.cookies['_session_id']))
        ThreadLocalORMSession.flush_all()
        r = self.app.get('/auth/verify_addr', params=dict(a='foo'))
        assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r)
        ea = M.EmailAddress.find({'email': email}).first()
        r = self.app.get('/auth/verify_addr', params=dict(a=ea.nonce))
        assert json.loads(self.webflash(r))['status'] == 'ok', self.webflash(r)
        r = self.app.get('/auth/logout')

        with audits('Successful login', user=True):
            r = self.app.post('/auth/do_login', params=dict(
                username='test-user', password='foo',
                _session_id=self.app.cookies['_session_id']),
                antispam=True).follow()
            assert_equal(r.headers['Location'], 'http://localhost/dashboard')

        r = self.app.post('/auth/do_login', antispam=True, params=dict(
            username='test-user', password='foo', honey1='robot',  # bad honeypot value
            _session_id=self.app.cookies['_session_id']),
                          extra_environ={'regular_antispam_err_handling_even_when_tests': str('true')},
                          status=302)
        wf = json.loads(self.webflash(r))
        assert_equal(wf['status'], 'error')
        assert_equal(wf['message'], 'Spambot protection engaged')

        with audits('Failed login', user=True):
            r = self.app.post('/auth/do_login', antispam=True, params=dict(
                username='test-user', password='food',
                _session_id=self.app.cookies['_session_id']))
            assert 'Invalid login' in str(r), r.showbrowser()

        r = self.app.post('/auth/do_login', antispam=True, params=dict(
            username='test-usera', password='foo',
            _session_id=self.app.cookies['_session_id']))
        assert 'Invalid login' in str(r), r.showbrowser()

    def test_login_invalid_username(self):
        extra = {'username': str('*anonymous')}
        r = self.app.get('/auth/', extra_environ=extra)
        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = 'test@user.com'
        f[encoded['password']] = 'foo'
        r = f.submit(extra_environ={'username': str('*anonymous')})
        r.mustcontain('Usernames only include small letters, ')

    def test_login_diff_ips_ok(self):
        # exercises AntiSpam.validate methods
        extra = {'username': str('*anonymous'), 'REMOTE_ADDR': str('11.22.33.44')}
        r = self.app.get('/auth/', extra_environ=extra)

        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = 'test-user'
        f[encoded['password']] = 'foo'
        with audits('Successful login', user=True):
            r = f.submit(extra_environ={'username': str('*anonymous'), 'REMOTE_ADDR': str('11.22.33.99')})

    def test_login_diff_ips_bad(self):
        # exercises AntiSpam.validate methods
        extra = {'username': str('*anonymous'), 'REMOTE_ADDR': str('24.52.32.123')}
        r = self.app.get('/auth/', extra_environ=extra)

        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = 'test-user'
        f[encoded['password']] = 'foo'
        r = f.submit(extra_environ={'username': str('*anonymous'), 'REMOTE_ADDR': str('11.22.33.99'),
                                    'regular_antispam_err_handling_even_when_tests': str('true')},
                     status=302)
        wf = json.loads(self.webflash(r))
        assert_equal(wf['status'], 'error')
        assert_equal(wf['message'], 'Spambot protection engaged')

    @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True))
    @patch('allura.tasks.mail_tasks.sendsimplemail')
    def test_login_hibp_compromised_password_untrusted_client(self, sendsimplemail):
        # first & only login by this user, so won't have any trusted previous logins
        self.app.extra_environ = {'disable_auth_magic': str('True')}
        r = self.app.get('/auth/')
        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = 'test-user'
        f[encoded['password']] = 'foo'

        with audits('Attempted login from untrusted location with password in HIBP breach database', user=True):
            r = f.submit(status=200)

        r.mustcontain('reset your password via email.')
        r.mustcontain('reset your password via email.<br>\nPlease check your email')

        args, kwargs = sendsimplemail.post.call_args
        assert_equal(sendsimplemail.post.call_count, 1)
        assert_equal(kwargs['subject'], 'Update your %s password' % config['site_name'])
        assert_in('/auth/forgotten_password/', kwargs['text'])

        assert_equal([], M.UserLoginDetails.query.find().all())  # no records created

    @patch('allura.tasks.mail_tasks.sendsimplemail')
    def test_login_hibp_compromised_password_trusted_client(self, sendsimplemail):
        self.app.extra_environ = {'disable_auth_magic': str('True')}

        # regular login first, so IP address will be recorded and then trusted
        r = self.app.get('/auth/')
        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = 'test-user'
        f[encoded['password']] = 'foo'
        with audits('Successful login', user=True):
            f.submit(status=302)
        self.app.get('/auth/logout')

        # this login will get caught by HIBP check, but trusted due to IP address being same
        with patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True)):
            r = self.app.get('/auth/')
            f = r.forms[0]
            encoded = self.app.antispam_field_names(f)
            f[encoded['username']] = 'test-user'
            f[encoded['password']] = 'foo'

            with audits(r'Successful login with password in HIBP breach database, from trusted source '
                        r'\(reason: exact ip\)', user=True):
                r = f.submit(status=302)

            assert r.session.get('pwd-expired')
            assert_equal(r.session.get('expired-reason'), 'hibp')
            assert_equal(r.location, 'http://localhost/auth/pwd_expired')

            r = r.follow()
            r.mustcontain('must be updated to be more secure')

            # changing password covered in TestPasswordExpire

    def test_login_disabled(self):
        u = M.User.query.get(username='test-user')
        u.disabled = True
        r = self.app.get('/auth/', extra_environ={'username': str('*anonymous')})
        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = 'test-user'
        f[encoded['password']] = 'foo'
        with audits('Failed login', user=True):
            r = f.submit(extra_environ={'username': str('*anonymous')})

    def test_login_pending(self):
        u = M.User.query.get(username='test-user')
        u.pending = True
        r = self.app.get('/auth/', extra_environ={'username': str('*anonymous')})
        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = 'test-user'
        f[encoded['password']] = 'foo'
        with audits('Failed login', user=True):
            r = f.submit(extra_environ={'username': str('*anonymous')})

    def test_login_overlay(self):
        r = self.app.get('/auth/login_fragment/', extra_environ={'username': str('*anonymous')})
        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = 'test-user'
        f[encoded['password']] = 'foo'
        with audits('Successful login', user=True):
            r = f.submit(extra_environ={'username': str('*anonymous')})

    def test_logout(self):
        self.app.extra_environ = {'disable_auth_magic': str('True')}
        nav_pattern = ('nav', {'class': 'nav-main'})
        r = self.app.get('/auth/')

        r = self.app.post('/auth/do_login', params=dict(
            username='test-user', password='foo',
            _session_id=self.app.cookies['_session_id']),
            extra_environ={'REMOTE_ADDR': str('127.0.0.1')},
            antispam=True).follow().follow()

        logged_in_session = r.session['_id']
        links = r.html.find(*nav_pattern).findAll('a')
        assert_equal(links[-1].string, "Log Out")

        r = self.app.get('/auth/logout').follow().follow()
        logged_out_session = r.session['_id']
        assert logged_in_session is not logged_out_session
        links = r.html.find(*nav_pattern).findAll('a')
        assert_equal(links[-1].string, 'Log In')

    def test_track_login(self):
        user = M.User.by_username('test-user')
        assert_equal(user.last_access['login_date'], None)
        assert_equal(user.last_access['login_ip'], None)
        assert_equal(user.last_access['login_ua'], None)

        self.app.get('/').follow()  # establish session
        self.app.post('/auth/do_login',
                      headers={str('User-Agent'): str('browser')},
                      extra_environ={'REMOTE_ADDR': str('127.0.0.1')},
                      params=dict(
                          username='test-user',
                          password='foo',
                          _session_id=self.app.cookies['_session_id'],
                      ),
                      antispam=True,
                      )
        user = M.User.by_username('test-user')
        assert_not_equal(user.last_access['login_date'], None)
        assert_equal(user.last_access['login_ip'], '127.0.0.1')
        assert_equal(user.last_access['login_ua'], 'browser')

    def test_rememberme(self):
        username = M.User.query.get(username='test-user').username

        r = self.app.get('/').follow()  # establish session

        # Login as test-user with remember me checkbox off
        r = self.app.post('/auth/do_login', params=dict(
            username='test-user', password='foo',
            _session_id=self.app.cookies['_session_id'],
        ), antispam=True)
        assert_equal(r.session['username'], username)
        assert_equal(r.session['login_expires'], True)

        for header, contents in r.headerlist:
            if header == 'Set-cookie':
                assert_not_in('expires', contents)

        # Login as test-user with remember me checkbox on
        r = self.app.post('/auth/do_login', params=dict(
            username='test-user', password='foo', rememberme='on',
            _session_id=self.app.cookies['_session_id'],
        ), antispam=True)
        assert_equal(r.session['username'], username)
        assert_not_equal(r.session['login_expires'], True)

        for header, contents in r.headerlist:
            if header == 'Set-cookie':
                assert_in('expires', contents)

    @td.with_user_project('test-admin')
    def test_user_can_not_claim_duplicate_emails(self):
        email_address = 'test_abcd_123@domain.net'
        user = M.User.query.get(username='test-admin')
        addresses_number = len(user.email_addresses)
        self.app.get('/').follow()  # establish session
        self.app.post('/auth/preferences/update_emails',
                      params={
                          'new_addr.addr': email_address,
                          'new_addr.claim': 'Claim Address',
                          'primary_addr': 'test-admin@users.localhost',
                          'preferences.email_format': 'plain',
                          'password': 'foo',
                          '_session_id': self.app.cookies['_session_id'],
                      },
                      extra_environ=dict(username=str('test-admin')))

        assert M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).count() == 1
        r = self.app.post('/auth/preferences/update_emails',
                          params={
                              'new_addr.addr': email_address,
                              'new_addr.claim': 'Claim Address',
                              'primary_addr': 'test-admin@users.localhost',
                              'preferences.email_format': 'plain',
                              'password': 'foo',
                              '_session_id': self.app.cookies['_session_id'],
                          },
                          extra_environ=dict(username=str('test-admin')))

        assert json.loads(self.webflash(r))['status'] == 'error', self.webflash(r)
        assert M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).count() == 1
        assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1

    @td.with_user_project('test-admin')
    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_user_added_claimed_address_by_other_user_confirmed(self, gen_message_id, sendsimplemail):
        self.app.get('/').follow()  # establish session
        email_address = 'test_abcd_123@domain.net'

        # test-user claimed & confirmed email address
        user = M.User.query.get(username='test-user')
        user.claim_address(email_address)
        email = M.EmailAddress.find(dict(email=email_address)).first()
        email.confirmed = True
        ThreadLocalORMSession.flush_all()

        # Claiming the same email address by test-admin
        # the email should be added to the email_addresses list but notifications should not be sent

        admin = M.User.query.get(username='test-admin')
        addresses_number = len(admin.email_addresses)
        r = self.app.post('/auth/preferences/update_emails',
                          params={
                              'new_addr.addr': email_address,
                              'new_addr.claim': 'Claim Address',
                              'primary_addr': 'test-admin@users.localhost',
                              'preferences.email_format': 'plain',
                              'password': 'foo',
                              '_session_id': self.app.cookies['_session_id'],
                          },
                          extra_environ=dict(username=str('test-admin')))

        assert json.loads(self.webflash(r))['status'] == 'ok'
        assert json.loads(self.webflash(r))['message'] == 'A verification email has been sent.  ' \
                                                          'Please check your email and click to confirm.'

        args, kwargs = sendsimplemail.post.call_args
        assert sendsimplemail.post.call_count == 1
        assert kwargs['toaddr'] == email_address
        assert kwargs['subject'] == '%s - Email address claim attempt' % config['site_name']
        assert "You tried to add %s to your Allura account, " \
               "but it is already claimed by your %s account." % (email_address, user.username) in kwargs['text']

        assert len(M.User.query.get(username='test-admin').email_addresses) == addresses_number + 1
        assert len(M.EmailAddress.find(dict(email=email_address)).all()) == 2

    @td.with_user_project('test-admin')
    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_user_added_claimed_address_by_other_user_not_confirmed(self, gen_message_id, sendsimplemail):
        email_address = 'test_abcd_1235@domain.net'

        # test-user claimed email address
        user = M.User.query.get(username='test-user')
        user.claim_address(email_address)
        email = M.EmailAddress.find(dict(email=email_address)).first()
        email.confirmed = False
        ThreadLocalORMSession.flush_all()
        # Claiming the same email address by test-admin
        # the email should be added to the email_addresses list but notifications should not be sent

        user1 = M.User.query.get(username='test-user-1')
        addresses_number = len(user1.email_addresses)
        self.app.get('/').follow()  # establish session
        r = self.app.post('/auth/preferences/update_emails',
                          params={
                              'new_addr.addr': email_address,
                              'new_addr.claim': 'Claim Address',
                              'primary_addr': 'test-user-1@users.localhost',
                              'preferences.email_format': 'plain',
                              'password': 'foo',
                              '_session_id': self.app.cookies['_session_id'],
                          },
                          extra_environ=dict(username=str('test-user-1')))

        assert json.loads(self.webflash(r))['status'] == 'ok'
        assert json.loads(self.webflash(r))['message'] == 'A verification email has been sent.  ' \
                                                          'Please check your email and click to confirm.'
        assert sendsimplemail.post.called
        assert len(M.User.query.get(username='test-user-1').email_addresses) == addresses_number + 1
        assert len(M.EmailAddress.find(dict(email=email_address)).all()) == 2

    @td.with_user_project('test-admin')
    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_user_cannot_claim_more_than_max_limit(self, gen_message_id, sendsimplemail):
        with h.push_config(config, **{'user_prefs.maximum_claimed_emails': '2'}):
            self.app.get('/').follow()  # establish session
            r = self.app.post('/auth/preferences/update_emails',
                              params={
                                  'new_addr.addr': 'test_abcd_1@domain.net',
                                  'new_addr.claim': 'Claim Address',
                                  'primary_addr': 'test-user-1@users.localhost',
                                  'preferences.email_format': 'plain',
                                  'password': 'foo',
                                  '_session_id': self.app.cookies['_session_id'],
                              },
                              extra_environ=dict(username=str('test-user-1')))
            assert json.loads(self.webflash(r))['status'] == 'ok'

            r = self.app.post('/auth/preferences/update_emails',
                              params={
                                  'new_addr.addr': 'test_abcd_2@domain.net',
                                  'new_addr.claim': 'Claim Address',
                                  'primary_addr': 'test-user-1@users.localhost',
                                  'preferences.email_format': 'plain',
                                  'password': 'foo',
                                  '_session_id': self.app.cookies['_session_id'],
                              },
                              extra_environ=dict(username=str('test-user-1')))

            assert json.loads(self.webflash(r))['status'] == 'error'
            assert json.loads(self.webflash(r))['message'] == 'You cannot claim more than 2 email addresses.'

    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_verification_link_for_confirmed_email(self, gen_message_id, sendsimplemail):
        self.app.get('/').follow()  # establish session
        email_address = 'test_abcd@domain.net'

        # test-user claimed email address
        user = M.User.query.get(username='test-user')
        user.claim_address(email_address)
        email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first()
        email.confirmed = True

        user1 = M.User.query.get(username='test-user-1')
        user1.claim_address(email_address)
        email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user1._id)).first()
        email.confirmed = False

        ThreadLocalORMSession.flush_all()

        r = self.app.post('/auth/send_verification_link',
                          params=dict(a=email_address, _session_id=self.app.cookies['_session_id']),
                          extra_environ=dict(username=str('test-user-1'), _session_id=self.app.cookies['_session_id']))

        assert json.loads(self.webflash(r))['status'] == 'ok'
        assert json.loads(self.webflash(r))['message'] == 'Verification link sent'

        args, kwargs = sendsimplemail.post.call_args
        assert sendsimplemail.post.call_count == 1
        assert kwargs['toaddr'] == email_address
        assert kwargs['subject'] == '%s - Email address claim attempt' % config['site_name']
        assert "You tried to add %s to your Allura account, " \
               "but it is already claimed by your %s account." % (email_address, user.username) in kwargs['text']

    def test_invalidate_verification_link_if_email_was_confirmed(self):
        self.app.get('/').follow()  # establish session
        email_address = 'test_abcd@domain.net'

        # test-user claimed email address
        user = M.User.query.get(username='test-user')
        user.claim_address(email_address)
        email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first()
        email.confirmed = False
        ThreadLocalORMSession.flush_all()

        self.app.post('/auth/send_verification_link',
                      params=dict(a=email_address,
                                  _session_id=self.app.cookies['_session_id']),
                      extra_environ=dict(username=str('test-user')))

        user1 = M.User.query.get(username='test-user-1')
        user1.claim_address(email_address)
        email1 = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user1._id)).first()
        email1.confirmed = True
        ThreadLocalORMSession.flush_all()
        # Verify first email with the verification link
        r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce),
                         extra_environ=dict(username=str('test-user')))

        assert json.loads(self.webflash(r))['status'] == 'error'
        email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first()
        assert not email.confirmed

    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_verify_addr_correct_session(self, gen_message_id, sendsimplemail):
        self.app.get('/').follow()  # establish session
        email_address = 'test_abcd@domain.net'

        # test-user claimed email address
        user = M.User.query.get(username='test-user')
        user.claim_address(email_address)
        email = M.EmailAddress.find(dict(email=email_address, claimed_by_user_id=user._id)).first()
        email.confirmed = False
        ThreadLocalORMSession.flush_all()

        self.app.post('/auth/send_verification_link',
                      params=dict(a=email_address,
                                  _session_id=self.app.cookies['_session_id']),
                      extra_environ=dict(username=str('test-user')))

        # logged out, gets redirected to login page
        r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce),
                         extra_environ=dict(username=str('*anonymous')))
        assert_in('/auth/?return_to=%2Fauth%2Fverify_addr', r.location)

        # logged in as someone else
        r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce),
                         extra_environ=dict(username=str('test-admin')))
        assert_in('/auth/?return_to=%2Fauth%2Fverify_addr', r.location)
        assert_equal('You must be logged in to the correct account', json.loads(self.webflash(r))['message'])
        assert_equal('warning', json.loads(self.webflash(r))['status'])

        # logged in as correct user
        r = self.app.get('/auth/verify_addr', params=dict(a=email.nonce),
                         extra_environ=dict(username=str('test-user')))
        assert_in('confirmed', json.loads(self.webflash(r))['message'])
        assert_equal('ok', json.loads(self.webflash(r))['status'])

        # assert 'email added' notification email sent
        args, kwargs = sendsimplemail.post.call_args
        assert_equal(kwargs['toaddr'], user._id)
        assert_equal(kwargs['subject'], 'New Email Address Added')

    @staticmethod
    def _create_password_reset_hash():
        """ Generates a password reset token for a given user.

        :return: User object
        :rtype: User
        """
        # test-user claimed email address
        user = M.User.by_username('test-admin')
        user.set_tool_data('AuthPasswordReset',
                           hash="generated_hash_value",
                           hash_expiry="04-08-2020")
        hash = user.get_tool_data('AuthPasswordReset', 'hash')
        session(user).flush(user)

        hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry')
        assert_equal(hash, 'generated_hash_value')
        assert_equal(hash_expiry, '04-08-2020')
        return user

    def test_token_generator(self):
        """ Generates new token invalidation tests.

        The tests cover: changing, claiming, updating, removing email addresses.
        :returns: email_change_invalidates_token
        """
        _params = [{'new_addr.addr': 'test_abcd@domain.net',  # Change primary address
                    'primary_addr': 'test@example.com', },
                   {'new_addr.addr': 'test@example.com',  # Claim new address
                    'new_addr.claim': 'Claim Address',
                    'primary_addr': 'test-admin@users.localhost',
                    'password': 'foo',
                    'preferences.email_format': 'plain'},
                   {'addr-1.ord': '1',  # remove test-admin@users.localhost
                    'addr-1.delete': 'on',
                    'addr-2.ord': '2',
                    'new_addr.addr': '',
                    'primary_addr': 'test-admin@users.localhost',
                    'password': 'foo',
                    'preferences.email_format': 'plain'},
                   {'addr-1.ord': '1',  # Remove email
                    'addr-2.ord': '2',
                    'addr-2.delete': 'on',
                    'new_addr.addr': '',
                    'primary_addr': 'test-admin@users.localhost'}]

        for param in _params:
            yield self.email_change_invalidates_token, param

    def email_change_invalidates_token(self, change_params):
        user = self._create_password_reset_hash()
        session(user).flush(user)

        self.app.get('/').follow()  # establish session
        change_params['_session_id'] = self.app.cookies['_session_id']
        self.app.post('/auth/preferences/update_emails',
                      extra_environ=dict(username=str('test-admin')),
                      params=change_params)

        u = M.User.by_username('test-admin')
        print((u.get_tool_data('AuthPasswordReset', 'hash')))
        assert_equal(u.get_tool_data('AuthPasswordReset', 'hash'), '')
        assert_equal(u.get_tool_data('AuthPasswordReset', 'hash_expiry'), '')

    @td.with_user_project('test-admin')
    def test_change_password(self):
        self.app.get('/').follow()  # establish session
        # Get and assert user with password reset token.
        user = self._create_password_reset_hash()
        old_pass = user.get_pref('password')

        # Change password
        with audits('Password changed', user=True):
            self.app.post('/auth/preferences/change_password',
                          extra_environ=dict(username=str('test-admin')),
                          params={
                              'oldpw': 'foo',
                              'pw': 'asdfasdf',
                              'pw2': 'asdfasdf',
                              '_session_id': self.app.cookies['_session_id'],
                          })

        # Confirm password was changed.
        assert_not_equal(old_pass, user.get_pref('password'))

        # Confirm any existing tokens were reset.
        assert_equal(user.get_tool_data('AuthPasswordReset', 'hash'), '')
        assert_equal(user.get_tool_data('AuthPasswordReset', 'hash_expiry'), '')

        # Confirm an email was sent
        tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all()
        assert_equal(len(tasks), 1)
        assert_equal(tasks[0].kwargs['subject'], 'Password Changed')
        assert_in('The password for your', tasks[0].kwargs['text'])

    @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True))
    @td.with_user_project('test-admin')
    def test_change_password_hibp(self):
        self.app.get('/').follow()  # establish session
        # Get and assert user with password reset token.
        user = self._create_password_reset_hash()
        old_pass = user.get_pref('password')

        # Attempt change password with weak pwd
        r = self.app.post('/auth/preferences/change_password',
                          extra_environ=dict(username=str('test-admin')),
                          params={
                              'oldpw': 'foo',
                              'pw': 'password',
                              'pw2': 'password',
                              '_session_id': self.app.cookies['_session_id'],
                          })

        assert 'Unsafe' in str(r.headers)

        r = self.app.post('/auth/preferences/change_password',
                          extra_environ=dict(username=str('test-admin')),
                          params={
                              'oldpw': 'foo',
                              'pw': '3j84rhoirwnoiwrnoiw',
                              'pw2': '3j84rhoirwnoiwrnoiw',
                              '_session_id': self.app.cookies['_session_id'],
                          })
        assert 'Unsafe' not in str(r.headers)

        # Confirm password was changed.
        user = M.User.by_username('test-admin')
        assert_not_equal(old_pass, user.get_pref('password'))

    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    @td.with_user_project('test-admin')
    def test_prefs(self, gen_message_id, sendsimplemail):
        r = self.app.get('/auth/preferences/',
                         extra_environ=dict(username=str('test-admin')))
        # check preconditions of test data
        assert 'test@example.com' not in r
        assert 'test-admin@users.localhost' in r
        assert_equal(M.User.query.get(username='test-admin').get_pref('email_address'),
                     'test-admin@users.localhost')

        # add test@example
        with td.audits('New email address: test@example.com', user=True):
            r = self.app.post('/auth/preferences/update_emails',
                              extra_environ=dict(username=str('test-admin')),
                              params={
                                  'new_addr.addr': 'test@example.com',
                                  'new_addr.claim': 'Claim Address',
                                  'primary_addr': 'test-admin@users.localhost',
                                  'password': 'foo',
                                  'preferences.email_format': 'plain',
                                  '_session_id': self.app.cookies['_session_id'],
                              })
        r = self.app.get('/auth/preferences/')
        assert 'test@example.com' in r
        user = M.User.query.get(username='test-admin')
        assert_equal(user.get_pref('email_address'), 'test-admin@users.localhost')

        # remove test-admin@users.localhost
        with td.audits('Email address deleted: test-admin@users.localhost', user=True):
            r = self.app.post('/auth/preferences/update_emails',
                              extra_environ=dict(username=str('test-admin')),
                              params={
                                  'addr-1.ord': '1',
                                  'addr-1.delete': 'on',
                                  'addr-2.ord': '2',
                                  'new_addr.addr': '',
                                  'primary_addr': 'test-admin@users.localhost',
                                  'password': 'foo',
                                  'preferences.email_format': 'plain',
                                  '_session_id': self.app.cookies['_session_id'],
                              })

        # assert 'email_removed' notification email sent
        args, kwargs = sendsimplemail.post.call_args
        assert_equal(kwargs['toaddr'], user._id)
        assert_equal(kwargs['subject'], 'Email Address Removed')

        r = self.app.get('/auth/preferences/')
        assert 'test-admin@users.localhost' not in r
        # preferred address has not changed if email is not verified
        user = M.User.query.get(username='test-admin')
        assert_equal(user.get_pref('email_address'), None)

        with td.audits('Display Name changed Test Admin => Admin', user=True):
            r = self.app.post('/auth/preferences/update',
                              params={'preferences.display_name': 'Admin',
                                      '_session_id': self.app.cookies['_session_id'],
                                      },
                              extra_environ=dict(username=str('test-admin')))

    @td.with_user_project('test-admin')
    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_email_prefs_change_requires_password(self, gen_message_id, sendsimplemail):
        self.app.get('/').follow()  # establish session
        # Claim new email
        new_email_params = {
            'new_addr.addr': 'test@example.com',
            'new_addr.claim': 'Claim Address',
            'primary_addr': 'test-admin@users.localhost',
            '_session_id': self.app.cookies['_session_id'],
        }
        r = self.app.post('/auth/preferences/update_emails',
                          params=new_email_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_in('You must provide your current password to claim new email', self.webflash(r))
        assert_not_in('test@example.com', r.follow())
        new_email_params['password'] = 'bad pass'

        r = self.app.post('/auth/preferences/update_emails',
                          params=new_email_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_in('You must provide your current password to claim new email', self.webflash(r))
        assert_not_in('test@example.com', r.follow())
        new_email_params['password'] = 'foo'  # valid password

        r = self.app.post('/auth/preferences/update_emails',
                          params=new_email_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_not_in('You must provide your current password to claim new email', self.webflash(r))
        assert_in('test@example.com', r.follow())

        # Change primary address
        change_primary_params = {
            'new_addr.addr': '',
            'primary_addr': 'test@example.com',
            '_session_id': self.app.cookies['_session_id'],
        }
        r = self.app.post('/auth/preferences/update_emails',
                          params=change_primary_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_in('You must provide your current password to change primary address', self.webflash(r))
        assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test-admin@users.localhost')
        change_primary_params['password'] = 'bad pass'

        r = self.app.post('/auth/preferences/update_emails',
                          params=change_primary_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_in('You must provide your current password to change primary address', self.webflash(r))
        assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test-admin@users.localhost')
        change_primary_params['password'] = 'foo'  # valid password

        self.app.get('/auth/preferences/')  # let previous 'flash' message cookie get used up
        r = self.app.post('/auth/preferences/update_emails',
                          params=change_primary_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_not_in('You must provide your current password to change primary address', self.webflash(r))
        assert_equal(M.User.by_username('test-admin').get_pref('email_address'), 'test@example.com')

        # assert 'email added' notification email sent using original primary addr
        args, kwargs = sendsimplemail.post.call_args
        assert_equal(kwargs['toaddr'], 'test-admin@users.localhost')
        assert_equal(kwargs['subject'], 'Primary Email Address Changed')

        # Remove email
        remove_email_params = {
            'addr-1.ord': '1',
            'addr-2.ord': '2',
            'addr-2.delete': 'on',
            'new_addr.addr': '',
            'primary_addr': 'test-admin@users.localhost',
            '_session_id': self.app.cookies['_session_id'],
        }
        r = self.app.post('/auth/preferences/update_emails',
                          params=remove_email_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_in('You must provide your current password to delete an email', self.webflash(r))
        assert_in('test@example.com', r.follow())
        remove_email_params['password'] = 'bad pass'
        r = self.app.post('/auth/preferences/update_emails',
                          params=remove_email_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_in('You must provide your current password to delete an email', self.webflash(r))
        assert_in('test@example.com', r.follow())
        remove_email_params['password'] = 'foo'  # vallid password
        r = self.app.post('/auth/preferences/update_emails',
                          params=remove_email_params,
                          extra_environ=dict(username=str('test-admin')))
        assert_not_in('You must provide your current password to delete an email', self.webflash(r))
        assert_not_in('test@example.com', r.follow())

    @td.with_user_project('test-admin')
    def test_prefs_subscriptions(self):
        r = self.app.get('/auth/subscriptions/',
                         extra_environ=dict(username=str('test-admin')))
        subscriptions = M.Mailbox.query.find(dict(
            user_id=c.user._id, is_flash=False)).all()
        # make sure page actually lists all the user's subscriptions
        assert len(subscriptions) > 0, 'Test user has no subscriptions, cannot verify that they are shown'
        for m in subscriptions:
            assert str(m._id) in r, "Page doesn't list subscription for Mailbox._id = %s" % m._id

        # make sure page lists all tools which user can subscribe
        user = M.User.query.get(username='test-admin')
        for p in user.my_projects():
            for ac in p.app_configs:
                if not M.Mailbox.subscribed(project_id=p._id, app_config_id=ac._id):
                    if ac.tool_name in ('activity', 'admin', 'search', 'userstats', 'profile'):
                        # these have has_notifications=False
                        assert str(ac._id) not in r, "Page lists tool %s but it should not" % ac.tool_name
                    else:
                        assert str(ac._id) in r, "Page doesn't list tool %s" % ac.tool_name

    @td.with_user_project('test-admin')
    def test_update_user_notifications(self):
        self.app.get('/').follow()  # establish session
        assert not M.User.query.get(username='test-admin').get_pref('mention_notifications')
        self.app.post('/auth/subscriptions/update_user_notifications',
                      params={'_session_id': self.app.cookies['_session_id'],
                              })
        assert not M.User.query.get(username='test-admin').get_pref('mention_notifications')
        self.app.post('/auth/subscriptions/update_user_notifications',
                      params={'allow_umnotif': 'on',
                              '_session_id': self.app.cookies['_session_id'],
                              })
        assert M.User.query.get(username='test-admin').get_pref('mention_notifications')

    def _find_subscriptions_form(self, r):
        form = None
        for f in six.itervalues(r.forms):
            if f.action == 'update_subscriptions':
                form = f
                break
        assert form is not None, "Can't find subscriptions form"
        return form

    def _find_subscriptions_field(self, form, subscribed=False):
        field_name = None
        for k, v in six.iteritems(form.fields):
            if subscribed:
                check = v and v[0].value == 'on'
            else:
                check = v and v[0].value != 'on'
            if k and k.endswith('.subscribed') and check:
                field_name = k.replace('.subscribed', '')
        assert field_name, "Can't find unsubscribed tool for user"
        return field_name

    @td.with_user_project('test-admin')
    def test_prefs_subscriptions_subscribe(self):
        resp = self.app.get('/auth/subscriptions/',
                            extra_environ=dict(username=str('test-admin')))
        form = self._find_subscriptions_form(resp)
        # find not subscribed tool, subscribe and verify
        field_name = self._find_subscriptions_field(form, subscribed=False)
        t_id = ObjectId(form.fields[field_name + '.tool_id'][0].value)
        p_id = ObjectId(form.fields[field_name + '.project_id'][0].value)
        subscribed = M.Mailbox.subscribed(project_id=p_id, app_config_id=t_id)
        assert not subscribed, "User already subscribed for tool %s" % t_id
        form.fields[field_name + '.subscribed'][0].value = 'on'
        form.submit()
        subscribed = M.Mailbox.subscribed(project_id=p_id, app_config_id=t_id)
        assert subscribed, "User is not subscribed for tool %s" % t_id

    @td.with_user_project('test-admin')
    def test_prefs_subscriptions_unsubscribe(self):
        resp = self.app.get('/auth/subscriptions/',
                            extra_environ=dict(username=str('test-admin')))
        form = self._find_subscriptions_form(resp)
        field_name = self._find_subscriptions_field(form, subscribed=True)
        s_id = ObjectId(form.fields[field_name + '.subscription_id'][0].value)
        s = M.Mailbox.query.get(_id=s_id)
        assert s, "User has not subscription with Mailbox._id = %s" % s_id
        form.fields[field_name + '.subscribed'][0].value = None
        form.submit()
        s = M.Mailbox.query.get(_id=s_id)
        assert not s, "User still has subscription with Mailbox._id %s" % s_id

    def test_format_email(self):
        self.app.get('/').follow()  # establish session
        self.app.post('/auth/subscriptions/update_subscriptions',
                      params={'email_format': 'plain', 'subscriptions': '',
                              '_session_id': self.app.cookies['_session_id']})
        r = self.app.get('/auth/subscriptions/')
        assert '<option selected value="plain">Plain Text</option>' in r
        self.app.post('/auth/subscriptions/update_subscriptions',
                      params={'email_format': 'both', 'subscriptions': '',
                              '_session_id': self.app.cookies['_session_id']})
        r = self.app.get('/auth/subscriptions/')
        assert '<option selected value="both">HTML</option>' in r

    def test_create_account(self):
        r = self.app.get('/auth/create_account')
        assert 'Create an Account' in r
        r = self.app.post('/auth/save_new',
                          params=dict(username='AAA', pw='123',
                                      _session_id=self.app.cookies['_session_id']))
        assert_in('Enter a value 6 characters long or more', r)
        assert_in('Usernames must include only small letters, numbers, '
                  'and dashes. They must also start with a letter and be '
                  'at least 3 characters long.', r)
        r = self.app.post(
            '/auth/save_new',
            params=dict(
                username='aaa',
                pw='12345678',
                pw2='12345678',
                display_name='Test Me',
                _session_id=self.app.cookies['_session_id'],
            ))
        r = r.follow().follow()
        assert 'User "aaa" registered' in unentity(r.text)
        r = self.app.post(
            '/auth/save_new',
            params=dict(
                username='aaa',
                pw='12345678',
                pw2='12345678',
                display_name='Test Me',
                _session_id=self.app.cookies['_session_id'],
            ))
        assert 'That username is already taken. Please choose another.' in r
        r = self.app.get('/auth/logout')
        r = self.app.post(
            '/auth/do_login',
            params=dict(username='aaa', password='12345678',
                        _session_id=self.app.cookies['_session_id']), antispam=True,
            status=302)

    def test_create_account_require_email(self):
        self.app.get('/').follow()  # establish session
        with h.push_config(config, **{'auth.require_email_addr': 'false'}):
            self.app.post(
                '/auth/save_new',
                params=dict(
                    username='aaa',
                    pw='12345678',
                    pw2='12345678',
                    display_name='Test Me',
                    email='test@example.com',
                    _session_id=self.app.cookies['_session_id'],
                ))
            user = M.User.query.get(username='aaa')
            assert not user.pending
            assert_equal(M.Project.query.find({'name': 'u/aaa'}).count(), 1)
        with h.push_config(config, **{'auth.require_email_addr': 'true'}):
            self.app.post(
                '/auth/save_new',
                params=dict(
                    username='bbb',
                    pw='12345678',
                    pw2='12345678',
                    display_name='Test Me',
                    email='test@example.com',
                    _session_id=self.app.cookies['_session_id']
                ))
            user = M.User.query.get(username='bbb')
            assert user.pending
            assert_equal(M.Project.query.find({'name': 'u/bbb'}).count(), 0)

    def test_verify_email(self):
        with h.push_config(config, **{'auth.require_email_addr': 'true'}):
            self.app.get('/').follow()  # establish session
            r = self.app.post(
                '/auth/save_new',
                params=dict(
                    username='aaa',
                    pw='12345678',
                    pw2='12345678',
                    display_name='Test Me',
                    email='test@example.com',
                    _session_id=self.app.cookies['_session_id']
                ))
            r = r.follow()
            user = M.User.query.get(username='aaa')
            em = M.EmailAddress.get(email='test@example.com')
            assert user._id == em.claimed_by_user_id
            r = self.app.get('/auth/verify_addr', params=dict(a=em.nonce))
            user = M.User.query.get(username='aaa')
            em = M.EmailAddress.get(email='test@example.com')
            assert not user.pending
            assert em.confirmed
            assert user.get_pref('email_address')
            assert_equal(M.Project.query.find({'name': 'u/aaa'}).count(), 1)

    def test_create_account_disabled_header_link(self):
        with h.push_config(config, **{'auth.allow_user_registration': 'false'}):
            r = self.app.get('/')
            assert 'Register' not in r

    def test_create_account_disabled_form_gone(self):
        with h.push_config(config, **{'auth.allow_user_registration': 'false'}):
            r = self.app.get('/auth/create_account', status=404)
            assert 'Create an Account' not in r

    def test_create_account_disabled_submit_fails(self):
        with h.push_config(config, **{'auth.allow_user_registration': 'false'}):
            self.app.get('/').follow()  # establish session
            self.app.post('/auth/save_new',
                          params=dict(
                              username='aaa',
                              pw='12345678',
                              pw2='12345678',
                              display_name='Test Me',
                              _session_id=self.app.cookies['_session_id']
                          ),
                          status=404)

    def test_one_project_role(self):
        """Make sure when a user goes to a new project only one project role is created.
           There was an issue with extra project roles getting created if a user went directly to
           an admin page."""
        p_nbhd = M.Neighborhood.query.get(name='Projects')
        p = M.Project.query.get(shortname='test', neighborhood_id=p_nbhd._id)
        self.app.get('/').follow()  # establish session
        self.app.post('/auth/save_new', params=dict(
            username='aaa',
            pw='12345678',
            pw2='12345678',
            display_name='Test Me',
            email='test@example.com',
            _session_id=self.app.cookies['_session_id'],
        )).follow()
        user = M.User.query.get(username='aaa')
        user.pending = False
        session(user).flush(user)
        assert M.ProjectRole.query.find(
            dict(user_id=user._id, project_id=p._id)).count() == 0

        self.app.get('/p/test/admin/permissions',
                     extra_environ=dict(username=str('aaa')), status=403)
        assert M.ProjectRole.query.find(
            dict(user_id=user._id, project_id=p._id)).count() <= 1

    def test_default_lookup(self):
        # Make sure that default _lookup() throws 404
        self.app.get('/auth/foobar', status=404)

    def test_disabled_user(self):
        user = M.User.query.get(username='test-admin')
        sess = session(user)
        assert not user.disabled
        r = self.app.get('/p/test/admin/',
                         extra_environ={'username': str('test-admin')})
        assert_equal(r.status_int, 200, 'Redirect to %s' % r.location)
        user.disabled = True
        sess.save(user)
        sess.flush()
        user = M.User.query.get(username='test-admin')
        assert user.disabled
        r = self.app.get('/p/test/admin/',
                         extra_environ={'username': str('test-admin')})
        assert_equal(r.status_int, 302)
        assert_equal(r.location, 'http://localhost/auth/?return_to=%2Fp%2Ftest%2Fadmin%2F')

    def test_no_open_return_to(self):
        r = self.app.get('/auth/logout').follow().follow()
        r = self.app.post('/auth/do_login', params=dict(
            username='test-user', password='foo',
            return_to='/foo',
            _session_id=self.app.cookies['_session_id']),
            antispam=True
        )
        assert_equal(r.location, 'http://localhost/foo')

        r = self.app.get('/auth/logout')
        r = self.app.post('/auth/do_login', antispam=True, params=dict(
            username='test-user', password='foo',
            return_to='http://localhost/foo',
            _session_id=self.app.cookies['_session_id']))
        assert_equal(r.location, 'http://localhost/foo')

        r = self.app.get('/auth/logout')
        r = self.app.post('/auth/do_login', antispam=True, params=dict(
            username='test-user', password='foo',
            return_to='http://example.com/foo',
            _session_id=self.app.cookies['_session_id'])).follow()
        assert_equal(r.location, 'http://localhost/dashboard')

        r = self.app.get('/auth/logout')
        r = self.app.post('/auth/do_login', antispam=True, params=dict(
            username='test-user', password='foo',
            return_to='//example.com/foo',
            _session_id=self.app.cookies['_session_id'])).follow()
        assert_equal(r.location, 'http://localhost/dashboard')

    def test_no_injected_headers_in_return_to(self):
        r = self.app.get('/auth/logout').follow().follow()
        r = self.app.post('/auth/do_login', params=dict(
            username='test-user', password='foo',
            return_to='/foo\nContent-Length: 777',
            # WebTest actually will raise an error if there's an invalid header (webob itself does not)
            _session_id=self.app.cookies['_session_id']),
            antispam=True
        )
        assert_equal(r.location, 'http://localhost/')
        assert_not_equal(r.content_length, 777)


class TestPreferences(TestController):
    @td.with_user_project('test-admin')
    def test_personal_data(self):
        from pytz import country_names

        setsex, setbirthdate, setcountry, setcity, settimezone = \
            ('Male', '19/08/1988', 'IT', 'Milan', 'Europe/Rome')
        self.app.get('/auth/user_info/')

        # Check if personal data is properly set
        r = self.app.post('/auth/user_info/change_personal_data',
                          params=dict(
                              sex=setsex,
                              birthdate=setbirthdate,
                              country=setcountry,
                              city=setcity,
                              timezone=settimezone,
                              _session_id=self.app.cookies['_session_id'],
                          ))
        user = M.User.query.get(username='test-admin')
        sex = user.sex
        assert sex == setsex
        birthdate = user.birthdate.strftime('%d/%m/%Y')
        assert birthdate == setbirthdate
        country = user.localization.country
        assert country_names.get(setcountry) == country
        city = user.localization.city
        assert city == setcity
        timezone = user.timezone
        assert timezone == settimezone

        # Check if setting a wrong date everything works correctly
        r = self.app.post('/auth/user_info/change_personal_data',
                          params=dict(birthdate='30/02/1998', _session_id=self.app.cookies['_session_id']))
        assert 'Please enter a valid date' in r.text
        user = M.User.query.get(username='test-admin')
        sex = user.sex
        assert sex == setsex
        birthdate = user.birthdate.strftime('%d/%m/%Y')
        assert birthdate == setbirthdate
        country = user.localization.country
        assert country_names.get(setcountry) == country
        city = user.localization.city
        assert city == setcity
        timezone = user.timezone
        assert timezone == settimezone

        # Check deleting birthdate
        r = self.app.post('/auth/user_info/change_personal_data',
                          params=dict(
                              sex=setsex,
                              birthdate='',
                              country=setcountry,
                              city=setcity,
                              timezone=settimezone,
                              _session_id=self.app.cookies['_session_id'],
                          ))
        user = M.User.query.get(username='test-admin')
        assert user.birthdate is None

    @td.with_user_project('test-admin')
    def test_contacts(self):
        # Add skype account
        testvalue = 'testaccount'
        self.app.get('/auth/user_info/contacts/')
        self.app.post('/auth/user_info/contacts/skype_account',
                      params=dict(skypeaccount=testvalue, _session_id=self.app.cookies['_session_id']))
        user = M.User.query.get(username='test-admin')
        assert user.skypeaccount == testvalue

        # Add social network account
        socialnetwork = 'Facebook'
        accounturl = 'http://www.facebook.com/test'
        self.app.post('/auth/user_info/contacts/add_social_network',
                      params=dict(socialnetwork=socialnetwork,
                                  accounturl=accounturl,
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert len(user.socialnetworks) == 1
        assert_equal(user.socialnetworks[0].socialnetwork, socialnetwork)
        assert_equal(user.socialnetworks[0].accounturl, accounturl)

        # Add second social network account
        socialnetwork2 = 'Twitter'
        accounturl2 = 'http://twitter.com/test'
        self.app.post('/auth/user_info/contacts/add_social_network',
                      params=dict(socialnetwork=socialnetwork2,
                                  accounturl='@test',
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert len(user.socialnetworks) == 2
        assert_in({'socialnetwork': socialnetwork, 'accounturl': accounturl}, user.socialnetworks)
        assert_in({'socialnetwork': socialnetwork2, 'accounturl': accounturl2}, user.socialnetworks)

        # Remove first social network account
        self.app.post('/auth/user_info/contacts/remove_social_network',
                      params=dict(socialnetwork=socialnetwork,
                                  account=accounturl,
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert len(user.socialnetworks) == 1
        assert_in({'socialnetwork': socialnetwork2, 'accounturl': accounturl2}, user.socialnetworks)

        # Add empty social network account
        self.app.post('/auth/user_info/contacts/add_social_network',
                      params=dict(accounturl=accounturl, socialnetwork='',
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert len(user.socialnetworks) == 1
        assert_in({'socialnetwork': socialnetwork2, 'accounturl': accounturl2}, user.socialnetworks)

        # Add invalid social network account
        self.app.post('/auth/user_info/contacts/add_social_network',
                      params=dict(accounturl=accounturl, socialnetwork='invalid',
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert len(user.socialnetworks) == 1
        assert_in({'socialnetwork': socialnetwork2, 'accounturl': accounturl2}, user.socialnetworks)

        # Add telephone number
        telnumber = '+3902123456'
        self.app.post('/auth/user_info/contacts/add_telnumber',
                      params=dict(newnumber=telnumber,
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert (len(user.telnumbers) == 1 and (user.telnumbers[0] == telnumber))

        # Add second telephone number
        telnumber2 = '+3902654321'
        self.app.post('/auth/user_info/contacts/add_telnumber',
                      params=dict(newnumber=telnumber2,
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert (len(user.telnumbers) == 2 and telnumber in user.telnumbers and telnumber2 in user.telnumbers)

        # Remove first telephone number
        self.app.post('/auth/user_info/contacts/remove_telnumber',
                      params=dict(oldvalue=telnumber,
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert (len(user.telnumbers) == 1 and telnumber2 in user.telnumbers)

        # Add website
        website = 'http://www.testurl.com'
        self.app.post('/auth/user_info/contacts/add_webpage',
                      params=dict(newwebsite=website,
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert (len(user.webpages) == 1 and (website in user.webpages))

        # Add second website
        website2 = 'http://www.testurl2.com'
        self.app.post('/auth/user_info/contacts/add_webpage',
                      params=dict(newwebsite=website2,
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert (len(user.webpages) == 2 and website in user.webpages and website2 in user.webpages)

        # Remove first website
        self.app.post('/auth/user_info/contacts/remove_webpage',
                      params=dict(oldvalue=website,
                                  _session_id=self.app.cookies['_session_id'],
                                  ))
        user = M.User.query.get(username='test-admin')
        assert (len(user.webpages) == 1 and website2 in user.webpages)

    @td.with_user_project('test-admin')
    def test_availability(self):
        # Add availability timeslot
        weekday = 'Monday'
        starttime = time(9, 0, 0)
        endtime = time(12, 0, 0)

        self.app.get('/auth/user_info/availability/')
        r = self.app.post('/auth/user_info/availability/add_timeslot',
                          params=dict(
                              weekday=weekday,
                              starttime=starttime.strftime('%H:%M'),
                              endtime=endtime.strftime('%H:%M'),
                              _session_id=self.app.cookies['_session_id'],
                          ))
        user = M.User.query.get(username='test-admin')
        timeslot1dict = dict(week_day=weekday, start_time=starttime, end_time=endtime)
        assert len(user.availability) == 1 and timeslot1dict in user.get_availability_timeslots()

        weekday2 = 'Tuesday'
        starttime2 = time(14, 0, 0)
        endtime2 = time(16, 0, 0)

        # Add second availability timeslot
        r = self.app.post('/auth/user_info/availability/add_timeslot',
                          params=dict(
                              weekday=weekday2,
                              starttime=starttime2.strftime('%H:%M'),
                              endtime=endtime2.strftime('%H:%M'),
                              _session_id=self.app.cookies['_session_id'],
                          ))
        user = M.User.query.get(username='test-admin')
        timeslot2dict = dict(week_day=weekday2, start_time=starttime2, end_time=endtime2)
        assert len(user.availability) == 2
        assert_in(timeslot1dict, user.get_availability_timeslots())
        assert_in(timeslot2dict, user.get_availability_timeslots())

        # Remove availability timeslot
        r = self.app.post('/auth/user_info/availability/remove_timeslot',
                          params=dict(
                              weekday=weekday,
                              starttime=starttime.strftime('%H:%M'),
                              endtime=endtime.strftime('%H:%M'),
                              _session_id=self.app.cookies['_session_id'],
                          ))
        user = M.User.query.get(username='test-admin')
        assert len(user.availability) == 1 and timeslot2dict in user.get_availability_timeslots()

        # Add invalid availability timeslot
        r = self.app.post('/auth/user_info/availability/add_timeslot',
                          params=dict(
                              weekday=weekday2,
                              starttime=endtime2.strftime('%H:%M'),
                              endtime=starttime2.strftime('%H:%M'),
                              _session_id=self.app.cookies['_session_id'],
                          ))
        assert 'Invalid period:' in str(r)
        user = M.User.query.get(username='test-admin')
        timeslot2dict = dict(week_day=weekday2, start_time=starttime2, end_time=endtime2)
        assert len(user.availability) == 1 and timeslot2dict in user.get_availability_timeslots()

    @td.with_user_project('test-admin')
    def test_inactivity(self):
        # Add inactivity period
        now = datetime.utcnow().date()
        now = datetime(now.year, now.month, now.day)
        startdate = now + timedelta(days=1)
        enddate = now + timedelta(days=7)
        self.app.get('/auth/user_info/availability/')
        r = self.app.post('/auth/user_info/availability/add_inactive_period',
                          params=dict(
                              startdate=startdate.strftime('%d/%m/%Y'),
                              enddate=enddate.strftime('%d/%m/%Y'),
                              _session_id=self.app.cookies['_session_id'],
                          ))
        user = M.User.query.get(username='test-admin')
        period1dict = dict(start_date=startdate, end_date=enddate)
        assert len(user.inactiveperiod) == 1 and period1dict in user.get_inactive_periods()

        # Add second inactivity period
        startdate2 = now + timedelta(days=24)
        enddate2 = now + timedelta(days=28)
        r = self.app.post('/auth/user_info/availability/add_inactive_period',
                          params=dict(
                              startdate=startdate2.strftime('%d/%m/%Y'),
                              enddate=enddate2.strftime('%d/%m/%Y'),
                              _session_id=self.app.cookies['_session_id'],
                          ))
        user = M.User.query.get(username='test-admin')
        period2dict = dict(start_date=startdate2, end_date=enddate2)
        assert len(user.inactiveperiod) == 2
        assert_in(period1dict, user.get_inactive_periods())
        assert_in(period2dict, user.get_inactive_periods())

        # Remove first inactivity period
        r = self.app.post(
            '/auth/user_info/availability/remove_inactive_period',
            params=dict(
                startdate=startdate.strftime('%d/%m/%Y'),
                enddate=enddate.strftime('%d/%m/%Y'),
                _session_id=self.app.cookies['_session_id'],
            ))
        user = M.User.query.get(username='test-admin')
        assert len(user.inactiveperiod) == 1 and period2dict in user.get_inactive_periods()

        # Add invalid inactivity period
        r = self.app.post('/auth/user_info/availability/add_inactive_period',
                          params=dict(
                              startdate='NOT/A/DATE',
                              enddate=enddate2.strftime('%d/%m/%Y'),
                              _session_id=self.app.cookies['_session_id'],
                          ))
        user = M.User.query.get(username='test-admin')
        assert 'Please enter a valid date' in str(r)
        assert len(user.inactiveperiod) == 1 and period2dict in user.get_inactive_periods()

    @td.with_user_project('test-admin')
    def test_skills(self):
        setup_trove_categories()
        # Add a skill
        skill_cat = M.TroveCategory.query.get(show_as_skill=True)
        level = 'low'
        comment = 'test comment'
        self.app.get('/auth/user_info/skills/')
        self.app.post('/auth/user_info/skills/save_skill',
                      params=dict(
                          level=level,
                          comment=comment,
                          selected_skill=str(skill_cat.trove_cat_id),
                          _session_id=self.app.cookies['_session_id'],
                      ))
        user = M.User.query.get(username='test-admin')
        skilldict = dict(category_id=skill_cat._id,
                         comment=comment, level=level)
        assert len(user.skills) == 1 and skilldict in user.skills

        # Add again the same skill
        level = 'medium'
        comment = 'test comment 2'
        self.app.get('/auth/user_info/skills/')
        self.app.post('/auth/user_info/skills/save_skill',
                      params=dict(
                          level=level,
                          comment=comment,
                          selected_skill=str(skill_cat.trove_cat_id),
                          _session_id=self.app.cookies['_session_id'],
                      ))
        user = M.User.query.get(username='test-admin')
        skilldict = dict(category_id=skill_cat._id,
                         comment=comment, level=level)
        assert len(user.skills) == 1 and skilldict in user.skills

        # Add an invalid skill
        level2 = 'not a level'
        comment2 = 'test comment 2'
        self.app.post('/auth/user_info/skills/save_skill',
                      params=dict(
                          level=level2,
                          comment=comment2,
                          selected_skill=str(skill_cat.trove_cat_id),
                          _session_id=self.app.cookies['_session_id'],
                      ))
        user = M.User.query.get(username='test-admin')
        # Check that everything is as it was before
        assert len(user.skills) == 1 and skilldict in user.skills

        # Remove a skill
        self.app.get('/auth/user_info/skills/')
        self.app.post('/auth/user_info/skills/remove_skill',
                      params=dict(
                          categoryid=str(skill_cat.trove_cat_id),
                          _session_id=self.app.cookies['_session_id'],
                      ))
        user = M.User.query.get(username='test-admin')
        assert len(user.skills) == 0

    @td.with_user_project('test-admin')
    def test_user_message(self):
        self.app.get('/').follow()  # establish session
        assert not M.User.query.get(username='test-admin').get_pref('disable_user_messages')
        self.app.post('/auth/preferences/user_message',
                      params={'_session_id': self.app.cookies['_session_id'],
                              })
        assert M.User.query.get(username='test-admin').get_pref('disable_user_messages')
        self.app.post('/auth/preferences/user_message',
                      params={'allow_user_messages': 'on',
                              '_session_id': self.app.cookies['_session_id'],
                              })
        assert not M.User.query.get(username='test-admin').get_pref('disable_user_messages')

    @td.with_user_project('test-admin')
    def test_additional_page(self):
        class MyPP(plugin.UserPreferencesProvider):
            def not_page(self):
                return 'not page'

            @expose()
            def new_page(self):
                return 'new page'

        with mock.patch.object(plugin.UserPreferencesProvider, 'get') as upp_get:
            upp_get.return_value = MyPP()
            r = self.app.get('/auth/new_page')
            assert_equal(r.text, 'new page')
            self.app.get('/auth/not_page', status=404)


class TestPasswordReset(TestController):
    test_primary_email = 'testprimaryaddr@mail.com'

    def setUp(self):
        super(TestPasswordReset, self).setUp()
        # so test-admin isn't automatically logged in for all requests
        self.app.extra_environ = {'disable_auth_magic': str('True')}

    @patch('allura.model.User.send_password_reset_email')
    @patch('allura.lib.plugin.LocalAuthenticationProvider.resend_verification_link')
    @patch('allura.tasks.mail_tasks.sendmail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_email_unconfirmed(self, gen_message_id, sendmail, p_sendlink, p_sendpwd):
        user = M.User.query.get(username='test-admin')
        user.pending = True
        email = M.EmailAddress.find(
            {'claimed_by_user_id': user._id}).first()
        email.confirmed = False
        ThreadLocalORMSession.flush_all()
        self.app.get('/').follow()  # establish session
        self.app.post('/auth/password_recovery_hash', {'email': email.email,
                                                       '_session_id': self.app.cookies['_session_id'],
                                                       })
        hash = user.get_tool_data('AuthPasswordReset', 'hash')
        assert hash is None
        p_sendlink.assert_called_once()
        p_sendpwd.assert_not_called()

    @patch('allura.tasks.mail_tasks.sendmail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_user_disabled(self, gen_message_id, sendmail):
        user = M.User.query.get(username='test-admin')
        email = M.EmailAddress.find(
            {'claimed_by_user_id': user._id}).first()
        user.disabled = True
        ThreadLocalORMSession.flush_all()
        self.app.get('/').follow()  # establish session
        self.app.post('/auth/password_recovery_hash', {'email': email.email,
                                                       '_session_id': self.app.cookies['_session_id'],
                                                       })
        hash = user.get_tool_data('AuthPasswordReset', 'hash')
        assert hash is None

    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_only_primary_email_reset_allowed(self, gen_message_id, sendmail):
        self.app.get('/').follow()  # establish session
        user = M.User.query.get(username='test-admin')
        user.claim_address(self.test_primary_email)
        user.set_pref('email_address', self.test_primary_email)

        email = M.EmailAddress.find({'email': self.test_primary_email}).first()
        email.confirmed = True
        ThreadLocalORMSession.flush_all()

        with h.push_config(config, **{'auth.allow_non_primary_email_password_reset': 'false'}):
            self.app.post('/auth/password_recovery_hash', {'email': self.test_primary_email,
                                                           '_session_id': self.app.cookies['_session_id'],
                                                           })
            hash = user.get_tool_data('AuthPasswordReset', 'hash')
            assert hash is not None
            args, kwargs = sendmail.post.call_args
            assert_equal(kwargs['toaddr'], self.test_primary_email)

    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_non_primary_email_reset_allowed(self, gen_message_id, sendmail):
        self.app.get('/').follow()  # establish session
        user = M.User.query.get(username='test-admin')
        email1 = M.EmailAddress.find({'claimed_by_user_id': user._id}).first()
        user.claim_address(self.test_primary_email)
        user.set_pref('email_address', self.test_primary_email)
        email = M.EmailAddress.find({'email': self.test_primary_email}).first()
        email.confirmed = True
        ThreadLocalORMSession.flush_all()
        with h.push_config(config, **{'auth.allow_non_primary_email_password_reset': 'true'}):
            self.app.post('/auth/password_recovery_hash', {'email': email1.email,
                                                           '_session_id': self.app.cookies['_session_id'],
                                                           })
            hash = user.get_tool_data('AuthPasswordReset', 'hash')
            assert hash is not None
            args, kwargs = sendmail.post.call_args
            assert_equal(kwargs['toaddr'], email1.email)

    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_password_reset(self, gen_message_id, sendsimplemail):
        self.app.get('/').follow()  # establish session
        user = M.User.query.get(username='test-admin')
        email = M.EmailAddress.find({'claimed_by_user_id': user._id}).first()
        email.confirmed = True
        ThreadLocalORMSession.flush_all()
        old_pw_hash = user.password

        # request a reset
        with td.audits('Password recovery link sent to: ' + email.email, user=True):
            r = self.app.post('/auth/password_recovery_hash', {'email': email.email,
                                                               '_session_id': self.app.cookies['_session_id'],
                                                               })
        # confirm some fields
        hash = user.get_tool_data('AuthPasswordReset', 'hash')
        hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry')
        assert hash is not None
        assert hash_expiry is not None

        # confirm email sent
        text = '''Your username is test-admin

To update your password on %s, please visit the following URL:

%s/auth/forgotten_password/%s''' % (config['site_name'], config['base_url'], hash)
        sendsimplemail.post.assert_called_once_with(
            sender='noreply@localhost',
            toaddr=email.email,
            fromaddr='"{}" <{}>'.format(config['site_name'], config['forgemail.return_path']),
            reply_to=config['forgemail.return_path'],
            subject='Allura Password recovery',
            message_id=gen_message_id(),
            text=text)

        # load reset form and fill it out
        r = self.app.get('/auth/forgotten_password/%s' % hash)
        assert_in('Enter a new password for: test-admin', r)
        assert_in('New Password:', r)
        assert_in('New Password (again):', r)
        form = r.forms[0]
        form['pw'] = form['pw2'] = new_password = '154321'
        with td.audits('Password changed \(through recovery process\)', user=True):
            # escape parentheses, so they would not be treated as regex group
            r = form.submit()

        # verify 'Password Changed' email sent
        args, kwargs = sendsimplemail.post.call_args
        assert_equal(kwargs['toaddr'], user._id)
        assert_equal(kwargs['subject'], 'Password Changed')

        # confirm password changed and works
        user = M.User.query.get(username='test-admin')
        assert_not_equal(old_pw_hash, user.password)
        provider = plugin.LocalAuthenticationProvider(None)
        assert_true(provider._validate_password(user, new_password))

        # confirm reset fields cleared
        user = M.User.query.get(username='test-admin')
        hash = user.get_tool_data('AuthPasswordReset', 'hash')
        hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry')
        assert_equal(hash, '')
        assert_equal(hash_expiry, '')

        # confirm can log in now in same session
        r = r.follow()
        assert 'Log Out' not in r, r
        form = r.forms[0]
        encoded = self.app.antispam_field_names(r.form)
        form[encoded['username']] = 'test-admin'
        form[encoded['password']] = new_password
        r = form.submit(status=302)
        r = r.follow().follow()
        assert 'Log Out' in r, r

    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_hash_expired(self, gen_message_id, sendmail):
        user = M.User.query.get(username='test-admin')
        email = M.EmailAddress.find(
            {'claimed_by_user_id': user._id}).first()
        email.confirmed = True
        ThreadLocalORMSession.flush_all()
        self.app.get('/').follow()  # establish session
        r = self.app.post('/auth/password_recovery_hash', {'email': email.email,
                                                           '_session_id': self.app.cookies['_session_id'],
                                                           })
        user = M.User.by_username('test-admin')
        hash = user.get_tool_data('AuthPasswordReset', 'hash')
        user.set_tool_data('AuthPasswordReset',
                           hash_expiry=datetime(2000, 10, 10))
        r = self.app.get('/auth/forgotten_password/%s' % hash.encode('utf-8'))
        assert_in('Unable to process reset, please try again', r.follow().text)
        r = self.app.post('/auth/set_new_password/%s' %
                          hash.encode('utf-8'), {'pw': '154321', 'pw2': '154321',
                                                 '_session_id': self.app.cookies['_session_id'],
                                                 })
        assert_in('Unable to process reset, please try again', r.follow().text)

    def test_hash_invalid(self):
        r = self.app.get('/auth/forgotten_password/123412341234', status=302)
        assert_in('Unable to process reset, please try again', r.follow().text)

    @patch('allura.lib.plugin.AuthenticationProvider')
    def test_provider_disabled(self, AP):
        user = M.User.query.get(username='test-admin')
        ap = AP.get()
        ap.forgotten_password_process = False
        ap.authenticate_request()._id = user._id
        ap.by_username().username = user.username
        self.app.get('/auth/forgotten_password', status=404)
        self.app.get('/').follow()  # establish session
        self.app.post('/auth/set_new_password',
                      {'pw': 'foo', 'pw2': 'foo', '_session_id': self.app.cookies['_session_id']},
                      status=404)
        self.app.post('/auth/password_recovery_hash',
                      {'email': 'foo', '_session_id': self.app.cookies['_session_id']},
                      status=404)

    @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True))
    @patch('allura.tasks.mail_tasks.sendsimplemail')
    @patch('allura.lib.helpers.gen_message_id')
    def test_pwd_reset_hibp_check(self, gen_message_id, sendmail):
        self.app.get('/').follow()  # establish session
        user = M.User.query.get(username='test-admin')
        email = M.EmailAddress.find({'claimed_by_user_id': user._id}).first()
        email.confirmed = True
        ThreadLocalORMSession.flush_all()

        # request a reset
        r = self.app.post('/auth/password_recovery_hash', {'email': email.email,
                                                           '_session_id': self.app.cookies['_session_id'],
                                                           })
        hash = user.get_tool_data('AuthPasswordReset', 'hash')

        # load reset form and fill it out with weak password
        r = self.app.get('/auth/forgotten_password/%s' % hash)
        form = r.forms[0]
        form['pw'] = form['pw2'] = new_password = 'password'
        r = form.submit()
        assert 'Unsafe' in str(r.headers)

        # fill it out again, with a stronger password
        r = r.follow()
        form = r.forms[0]
        form['pw'] = form['pw2'] = new_password = 'oj35h9u34280j924hnuiw'  # something unlikely to trip at hibp
        r = form.submit()
        assert 'Unsafe' not in str(r.headers)

        # confirm password changed and works
        user = M.User.query.get(username='test-admin')
        provider = plugin.LocalAuthenticationProvider(None)
        assert_true(provider._validate_password(user, new_password))

        # confirm can log in now in same session
        r = r.follow()
        assert 'Log Out' not in r, r
        form = r.forms[0]
        encoded = self.app.antispam_field_names(r.form)
        form[encoded['username']] = 'test-admin'
        form[encoded['password']] = new_password
        r = form.submit(status=302)
        r = r.follow().follow()
        assert 'Log Out' in r, r


class TestOAuth(TestController):
    def test_register_deregister_app(self):
        # register
        r = self.app.get('/auth/oauth/')
        r = self.app.post('/auth/oauth/register',
                          params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez',
                                  '_session_id': self.app.cookies['_session_id'],
                                  }).follow()
        assert 'oautstapp' in r
        # deregister
        assert_equal(r.forms[0].action, 'deregister')
        r.forms[0].submit()
        r = self.app.get('/auth/oauth/')
        assert 'oautstapp' not in r

    def test_generate_revoke_access_token(self):
        # generate
        self.app.get('/').follow()  # establish session
        r = self.app.post('/auth/oauth/register',
                          params={'application_name': 'oautstapp', 'application_description': 'Oauth rulez',
                                  '_session_id': self.app.cookies['_session_id'],
                                  }, status=302)
        r = self.app.get('/auth/oauth/')
        assert_equal(r.forms[1].action, 'generate_access_token')
        r = r.forms[1].submit(extra_environ={'username': str('test-user')})  # not the right user
        assert_in("Invalid app ID", self.webflash(r))                   # gets an error
        r = self.app.get('/auth/oauth/')                                # do it again
        r = r.forms[1].submit()                                         # as correct user
        assert_equal('', self.webflash(r))

        r = self.app.get('/auth/oauth/')
        assert 'Bearer Token:' in r
        assert_not_equal(
            M.OAuthAccessToken.for_user(M.User.by_username('test-admin')), [])
        # revoke
        assert_equal(r.forms[0].action, 'revoke_access_token')
        r.forms[0].submit()
        r = self.app.get('/auth/oauth/')
        assert_not_equal(r.forms[0].action, 'revoke_access_token')
        assert_equal(
            M.OAuthAccessToken.for_user(M.User.by_username('test-admin')), [])

    def test_interactive(self):
        with mock.patch('allura.controllers.rest.oauth.Server') as Server, \
                mock.patch('allura.controllers.rest.oauth.Request') as Request:   # these are the oauth2 libs
            user = M.User.by_username('test-admin')
            M.OAuthConsumerToken(
                api_key='api_key',
                user_id=user._id,
                description='ctok_desc',
            )
            ThreadLocalORMSession.flush_all()
            Request.from_request.return_value = {
                'oauth_consumer_key': 'api_key',
                'oauth_callback': 'http://my.domain.com/callback',
            }
            r = self.app.post('/rest/oauth/request_token', params={})
            rtok = parse_qs(r.text)['oauth_token'][0]
            r = self.app.post('/rest/oauth/authorize',
                              params={'oauth_token': rtok})
            r = r.forms[0].submit('yes')
            assert r.location.startswith('http://my.domain.com/callback')
            pin = parse_qs(urlparse(r.location).query)['oauth_verifier'][0]
            Request.from_request.return_value = {
                'oauth_consumer_key': 'api_key',
                'oauth_token': rtok,
                'oauth_verifier': pin,
            }
            r = self.app.get('/rest/oauth/access_token')
            atok = parse_qs(r.text)
            assert_equal(len(atok['oauth_token']), 1)
            assert_equal(len(atok['oauth_token_secret']), 1)

        # now use the tokens & secrets to make a full OAuth request:
        oauth_secret = atok['oauth_token_secret'][0]
        oauth_token = atok['oauth_token'][0]
        consumer = oauth2.Consumer('api_key', oauth_secret)
        M.OAuthConsumerToken.consumer = consumer
        access_token = oauth2.Token(oauth_token, oauth_secret)
        oauth_client = oauth2.Client(consumer, access_token)
        # use the oauth2 lib, but intercept the request and then send it to self.app.get
        with mock.patch('oauth2.httplib2.Http.request', name='hl2req') as oa2_req:
            oauth_client.request('http://localhost/rest/p/test/', 'GET')
            oa2url = oa2_req.call_args[0][1]
            oa2url = oa2url.replace('http://localhost', '')
            print(oa2url)
            oa2kwargs = oa2_req.call_args[1]
        self.app.get(oa2url, headers=oa2kwargs['headers'], status=200)
        self.app.get(oa2url.replace('oauth_signature=', 'removed='), headers=oa2kwargs['headers'], status=401)

    @mock.patch('allura.controllers.rest.oauth.Server')
    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_request_token_valid(self, Request, Server):
        M.OAuthConsumerToken.consumer = mock.Mock()
        user = M.User.by_username('test-user')
        consumer_token = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
        )
        ThreadLocalORMSession.flush_all()
        req = Request.from_request.return_value = {'oauth_consumer_key': 'api_key'}
        r = self.app.post('/rest/oauth/request_token', params={'key': 'value'})

        # dict-ify webob.EnvironHeaders
        call = Request.from_request.call_args_list[0]
        call[1]['headers'] = dict(call[1]['headers'])
        # then check equality
        assert_equal(Request.from_request.call_args_list, [
            mock.call('POST', 'http://localhost/rest/oauth/request_token',
                      headers={'Host': 'localhost:80',
                               'Content-Type': 'application/x-www-form-urlencoded',
                               'Content-Length': '9'},
                      parameters={'key': 'value'},
                      query_string='')
        ])
        Server().verify_request.assert_called_once_with(req, consumer_token.consumer, None)
        request_token = M.OAuthRequestToken.query.get(consumer_token_id=consumer_token._id)
        assert_is_not_none(request_token)
        assert_equal(r.text, request_token.to_string())

    @mock.patch('allura.controllers.rest.oauth.Server')
    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_request_token_no_consumer_token_matching(self, Request, Server):
        Request.from_request.return_value = {'oauth_consumer_key': 'api_key'}
        self.app.post('/rest/oauth/request_token',
                      params={'key': 'value'}, status=401)

    @mock.patch('allura.controllers.rest.oauth.Server')
    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_request_token_no_consumer_token_given(self, Request, Server):
        Request.from_request.return_value = {}
        self.app.post('/rest/oauth/request_token', params={'key': 'value'}, status=401)

    @mock.patch('allura.controllers.rest.oauth.Server')
    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_request_token_invalid(self, Request, Server):
        Server().verify_request.side_effect = oauth2.Error('test_request_token_invalid')
        M.OAuthConsumerToken.consumer = mock.Mock()
        user = M.User.by_username('test-user')
        M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
        )
        ThreadLocalORMSession.flush_all()
        Request.from_request.return_value = {'oauth_consumer_key': 'api_key'}
        self.app.post('/rest/oauth/request_token', params={'key': 'value'}, status=401)

    def test_authorize_ok(self):
        user = M.User.by_username('test-admin')
        ctok = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        M.OAuthRequestToken(
            api_key='api_key',
            consumer_token_id=ctok._id,
            callback='oob',
            user_id=user._id,
        )
        ThreadLocalORMSession.flush_all()
        r = self.app.post('/rest/oauth/authorize', params={'oauth_token': 'api_key'})
        assert_in('ctok_desc', r.text)
        assert_in('api_key', r.text)

    def test_authorize_invalid(self):
        self.app.post('/rest/oauth/authorize', params={'oauth_token': 'api_key'}, status=401)

    def test_do_authorize_no(self):
        user = M.User.by_username('test-admin')
        ctok = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        M.OAuthRequestToken(
            api_key='api_key',
            consumer_token_id=ctok._id,
            callback='oob',
            user_id=user._id,
        )
        ThreadLocalORMSession.flush_all()
        self.app.post('/rest/oauth/do_authorize',
                      params={'no': '1', 'oauth_token': 'api_key'})
        assert_is_none(M.OAuthRequestToken.query.get(api_key='api_key'))

    def test_do_authorize_oob(self):
        user = M.User.by_username('test-admin')
        ctok = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        M.OAuthRequestToken(
            api_key='api_key',
            consumer_token_id=ctok._id,
            callback='oob',
            user_id=user._id,
        )
        ThreadLocalORMSession.flush_all()
        r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'})
        assert_is_not_none(r.html.find(text=re.compile('^PIN: ')))

    def test_do_authorize_cb(self):
        user = M.User.by_username('test-admin')
        ctok = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        M.OAuthRequestToken(
            api_key='api_key',
            consumer_token_id=ctok._id,
            callback='http://my.domain.com/callback',
            user_id=user._id,
        )
        ThreadLocalORMSession.flush_all()
        r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'})
        assert r.location.startswith('http://my.domain.com/callback?oauth_token=api_key&oauth_verifier=')

    def test_do_authorize_cb_params(self):
        user = M.User.by_username('test-admin')
        ctok = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        M.OAuthRequestToken(
            api_key='api_key',
            consumer_token_id=ctok._id,
            callback='http://my.domain.com/callback?myparam=foo',
            user_id=user._id,
        )
        ThreadLocalORMSession.flush_all()
        r = self.app.post('/rest/oauth/do_authorize', params={'yes': '1', 'oauth_token': 'api_key'})
        assert r.location.startswith('http://my.domain.com/callback?myparam=foo&oauth_token=api_key&oauth_verifier=')

    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_access_token_no_consumer(self, Request):
        Request.from_request.return_value = {
            'oauth_consumer_key': 'api_key',
            'oauth_token': 'api_key',
            'oauth_verifier': 'good',
        }
        self.app.get('/rest/oauth/access_token', status=401)

    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_access_token_no_request(self, Request):
        Request.from_request.return_value = {
            'oauth_consumer_key': 'api_key',
            'oauth_token': 'api_key',
            'oauth_verifier': 'good',
        }
        user = M.User.by_username('test-admin')
        M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        ThreadLocalORMSession.flush_all()
        self.app.get('/rest/oauth/access_token', status=401)

    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_access_token_bad_pin(self, Request):
        Request.from_request.return_value = {
            'oauth_consumer_key': 'api_key',
            'oauth_token': 'api_key',
            'oauth_verifier': 'bad',
        }
        user = M.User.by_username('test-admin')
        ctok = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        M.OAuthRequestToken(
            api_key='api_key',
            consumer_token_id=ctok._id,
            callback='http://my.domain.com/callback?myparam=foo',
            user_id=user._id,
            validation_pin='good',
        )
        ThreadLocalORMSession.flush_all()
        self.app.get('/rest/oauth/access_token', status=401)

    @mock.patch('allura.controllers.rest.oauth.Server')
    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_access_token_bad_sig(self, Request, Server):
        Request.from_request.return_value = {
            'oauth_consumer_key': 'api_key',
            'oauth_token': 'api_key',
            'oauth_verifier': 'good',
        }
        user = M.User.by_username('test-admin')
        ctok = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        M.OAuthRequestToken(
            api_key='api_key',
            consumer_token_id=ctok._id,
            callback='http://my.domain.com/callback?myparam=foo',
            user_id=user._id,
            validation_pin='good',
        )
        ThreadLocalORMSession.flush_all()
        Server().verify_request.side_effect = oauth2.Error('test_access_token_bad_sig')
        self.app.get('/rest/oauth/access_token', status=401)

    @mock.patch('allura.controllers.rest.oauth.Server')
    @mock.patch('allura.controllers.rest.oauth.Request')
    def test_access_token_ok(self, Request, Server):
        Request.from_request.return_value = {
            'oauth_consumer_key': 'api_key',
            'oauth_token': 'api_key',
            'oauth_verifier': 'good',
        }
        user = M.User.by_username('test-admin')
        ctok = M.OAuthConsumerToken(
            api_key='api_key',
            user_id=user._id,
            description='ctok_desc',
        )
        M.OAuthRequestToken(
            api_key='api_key',
            consumer_token_id=ctok._id,
            callback='http://my.domain.com/callback?myparam=foo',
            user_id=user._id,
            validation_pin='good',
        )
        ThreadLocalORMSession.flush_all()
        r = self.app.get('/rest/oauth/access_token')
        atok = parse_qs(r.text)
        assert_equal(len(atok['oauth_token']), 1)
        assert_equal(len(atok['oauth_token_secret']), 1)


class TestDisableAccount(TestController):
    def test_not_authenticated(self):
        r = self.app.get(
            '/auth/disable/',
            extra_environ={'username': str('*anonymous')})
        assert_equal(r.status_int, 302)
        assert_equal(r.location,
                     'http://localhost/auth/?return_to=%2Fauth%2Fdisable%2F')

    def test_lists_user_projects(self):
        r = self.app.get('/auth/disable/')
        user = M.User.by_username('test-admin')
        for p in user.my_projects_by_role_name('Admin'):
            if p.name == 'u/test-admin':
                continue
            assert_in(p.name, r)
            assert_in(p.url(), r)

    def test_has_asks_password(self):
        r = self.app.get('/auth/disable/')
        form = r.html.find('form', {'action': 'do_disable'})
        assert form is not None

    def test_bad_password(self):
        self.app.get('/').follow()  # establish session
        r = self.app.post('/auth/disable/do_disable', {'password': 'bad',
                                                       '_session_id': self.app.cookies['_session_id'], })
        assert_in('Invalid password', r)
        user = M.User.by_username('test-admin')
        assert_equal(user.disabled, False)

    def test_disable(self):
        self.app.get('/').follow()  # establish session
        r = self.app.post('/auth/disable/do_disable', {'password': 'foo',
                                                       '_session_id': self.app.cookies['_session_id'], })
        assert_equal(r.status_int, 302)
        assert_equal(r.location, 'http://localhost/')
        flash = json.loads(self.webflash(r))
        assert_equal(flash['status'], 'ok')
        assert_equal(flash['message'], 'Your account was successfully disabled!')
        user = M.User.by_username('test-admin')
        assert_equal(user.disabled, True)


class TestPasswordExpire(TestController):
    def login(self, username='test-user', pwd='foo', query_string=''):
        extra = {'username': str('*anonymous'), 'REMOTE_ADDR': str('127.0.0.1')}
        r = self.app.get('/auth/' + query_string, extra_environ=extra)

        f = r.forms[0]
        encoded = self.app.antispam_field_names(f)
        f[encoded['username']] = username
        f[encoded['password']] = pwd
        return f.submit(extra_environ={'username': str('*anonymous')})

    def assert_redirects(self, where='/'):
        resp = self.app.get(where, extra_environ={'username': str('test-user')}, status=302)
        assert_equal(resp.location, 'http://localhost/auth/pwd_expired?' + urlencode({'return_to': where}))

    def assert_not_redirects(self, where='/neighborhood'):
        self.app.get(where, extra_environ={'username': str('test-user')}, status=200)

    def test_disabled(self):
        r = self.login()
        assert_false(r.session.get('pwd-expired'))
        self.assert_not_redirects()

    def expired(self, r):
        return r.session.get('pwd-expired')

    def set_expire_for_user(self, username='test-user', days=100):
        user = M.User.by_username(username)
        user.last_password_updated = datetime.utcnow() - timedelta(days=days)
        session(user).flush(user)
        return user

    def test_days(self):
        self.set_expire_for_user()

        with h.push_config(config, **{'auth.pwdexpire.days': 180}):
            r = self.login()
            assert_false(self.expired(r))
            self.assert_not_redirects()

        with h.push_config(config, **{'auth.pwdexpire.days': 90}):
            r = self.login()
            assert_true(self.expired(r))
            self.assert_redirects()

    def test_before(self):
        self.set_expire_for_user()

        before = datetime.utcnow() - timedelta(days=180)
        before = calendar.timegm(before.timetuple())
        with h.push_config(config, **{'auth.pwdexpire.before': before}):
            r = self.login()
            assert_false(self.expired(r))
            self.assert_not_redirects()

        before = datetime.utcnow() - timedelta(days=90)
        before = calendar.timegm(before.timetuple())
        with h.push_config(config, **{'auth.pwdexpire.before': before}):
            r = self.login()
            assert_true(self.expired(r))
            self.assert_redirects()

    def test_logout(self):
        self.set_expire_for_user()
        with h.push_config(config, **{'auth.pwdexpire.days': 90}):
            r = self.login()
            assert_true(self.expired(r))
            self.assert_redirects()
            r = self.app.get('/auth/logout', extra_environ={'username': str('test-user')})
            assert_false(self.expired(r))
            self.assert_not_redirects()

    def test_change_pwd(self):
        self.set_expire_for_user()
        with h.push_config(config, **{'auth.pwdexpire.days': 90}):
            r = self.login()
            assert_true(self.expired(r))
            self.assert_redirects()

            user = M.User.by_username('test-user')
            old_update_time = user.last_password_updated
            old_password = user.password
            r = self.app.get('/auth/pwd_expired', extra_environ={'username': str('test-user')})
            f = r.forms[0]
            f['oldpw'] = 'foo'
            f['pw'] = 'qwerty'
            f['pw2'] = 'qwerty'
            r = f.submit(extra_environ={'username': str('test-user')}, status=302)
            assert_equal(r.location, 'http://localhost/')
            assert_false(self.expired(r))
            user = M.User.by_username('test-user')
            assert_true(user.last_password_updated > old_update_time)
            assert_not_equal(user.password, old_password)

            # Can log in with new password and change isn't required anymore
            r = self.login(pwd='qwerty').follow()
            assert_equal(r.location, 'http://localhost/dashboard')
            assert_not_in('Invalid login', r)
            assert_false(self.expired(r))
            self.assert_not_redirects()

            # and can't log in with old password
            r = self.login(pwd='foo')
            assert_in('Invalid login', r)

    def test_expired_pwd_change_invalidates_token(self):
        self.set_expire_for_user()
        with h.push_config(config, **{'auth.pwdexpire.days': 90}):
            r = self.login()
            assert_true(self.expired(r))
            self.assert_redirects()
            user = M.User.by_username('test-user')
            user.set_tool_data('AuthPasswordReset',
                               hash="generated_hash_value",
                               hash_expiry="04-08-2020")
            hash = user.get_tool_data('AuthPasswordReset', 'hash')
            hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry')
            assert_equal(hash, 'generated_hash_value')
            assert_equal(hash_expiry, '04-08-2020')
            session(user).flush(user)

            # Change expired password
            r = self.app.get('/auth/pwd_expired', extra_environ={'username': str('test-user')})
            f = r.forms[0]
            f['oldpw'] = 'foo'
            f['pw'] = 'qwerty'
            f['pw2'] = 'qwerty'
            r = f.submit(extra_environ={'username': str('test-user')}, status=302)
            assert_equal(r.location, 'http://localhost/')

            user = M.User.by_username('test-user')
            hash = user.get_tool_data('AuthPasswordReset', 'hash')
            hash_expiry = user.get_tool_data('AuthPasswordReset', 'hash_expiry')

            assert_equal(hash, '')
            assert_equal(hash_expiry, '')

    def check_validation(self, oldpw, pw, pw2):
        user = M.User.by_username('test-user')
        old_update_time = user.last_password_updated
        old_password = user.password
        r = self.app.get('/auth/pwd_expired', extra_environ={'username': str('test-user')})
        f = r.forms[0]
        f['oldpw'] = oldpw
        f['pw'] = pw
        f['pw2'] = pw2
        r = f.submit(extra_environ={'username': str('test-user')})
        assert_true(self.expired(r))
        user = M.User.by_username('test-user')
        assert_equal(user.last_password_updated, old_update_time)
        assert_equal(user.password, old_password)
        return r

    def test_change_pwd_validation(self):
        self.set_expire_for_user()
        with h.push_config(config, **{'auth.pwdexpire.days': 90}):
            r = self.login()
            assert_true(self.expired(r))
            self.assert_redirects()

            r = self.check_validation('', '', '')
            assert_in('Please enter a value', r)
            r = self.check_validation('', 'qwe', 'qwerty')
            assert_in('Enter a value 6 characters long or more', r)
            r = self.check_validation('bad', 'qwerty1', 'qwerty')
            assert_in('Passwords must match', r)
            r = self.check_validation('bad', 'qwerty', 'qwerty')
            assert_in('Incorrect password', self.webflash(r))
            assert_equal(r.location, 'http://localhost/auth/pwd_expired?return_to=')

            with h.push_config(config, **{'auth.min_password_len': 3}):
                r = self.check_validation('foo', 'foo', 'foo')
                assert_in('Your old and new password should not be the same', r)

    def test_return_to(self):
        return_to = '/p/test/tickets/?milestone=1.0&page=2'
        self.set_expire_for_user()
        with h.push_config(config, **{'auth.pwdexpire.days': 90}):
            r = self.login(query_string='?' + urlencode({'return_to': return_to}))
            # don't go to the return_to yet
            assert_equal(r.location, 'http://localhost/auth/pwd_expired?' + urlencode({'return_to': return_to}))

            # but if user tries to go directly there anyway, intercept and redirect back
            self.assert_redirects(where=return_to)

            r = self.app.get('/auth/pwd_expired', extra_environ={'username': str('test-user')})
            f = r.forms[0]
            f['oldpw'] = 'foo'
            f['pw'] = 'qwerty'
            f['pw2'] = 'qwerty'
            f['return_to'] = return_to
            r = f.submit(extra_environ={'username': str('test-user')}, status=302)
            assert_equal(r.location, 'http://localhost/p/test/tickets/?milestone=1.0&page=2')


class TestCSRFProtection(TestController):
    def test_blocks_invalid(self):
        # so test-admin isn't automatically logged in for all requests
        self.app.extra_environ = {'disable_auth_magic': str('True'), 'REMOTE_ADDR': str('127.0.0.1')}

        # regular login
        r = self.app.get('/auth/')

        r = self.app.post('/auth/do_login', params=dict(
            username='test-admin', password='foo',
            _session_id=self.app.cookies['_session_id']),
            antispam=True)

        # regular form submit
        r = self.app.get('/admin/overview')
        r = r.form.submit()
        assert_equal(r.location, 'http://localhost/admin/overview')

        # invalid form submit
        r = self.app.get('/admin/overview')
        r.form['_session_id'] = 'bogus'
        r = r.form.submit()
        assert_equal(r.location, 'http://localhost/auth/')

    def test_blocks_invalid_on_login(self):
        r = self.app.get('/auth/')
        r.form['_session_id'] = 'bogus'
        r.form.submit(status=403)

    def test_token_present_on_first_request(self):
        r = self.app.get('/auth/')
        assert_true(r.form['_session_id'].value)


class TestTwoFactor(TestController):

    sample_key = b'\x00K\xda\xbfv\xc2B\xaa\x1a\xbe\xa5\x96b\xb2\xa0Z:\xc9\xcf\x8a'

    def _init_totp(self, username='test-admin'):
        user = M.User.query.get(username=username)
        totp_srv = TotpService().get()
        totp_srv.set_secret_key(user, self.sample_key)
        user.set_pref('multifactor', True)

    def test_settings_on(self):
        r = self.app.get('/auth/preferences/')
        assert r.html.find(attrs={'class': 'preferences multifactor'})

    def test_settings_off(self):
        with h.push_config(config, **{'auth.multifactor.totp': 'false'}):
            r = self.app.get('/auth/preferences/')
            assert not r.html.find(attrs={'class': 'preferences multifactor'})

            for url in ['/auth/preferences/totp_new',
                        '/auth/preferences/totp_view',
                        '/auth/preferences/totp_set',
                        '/auth/preferences/totp_send_link',
                        '/auth/preferences/multifactor_disable',
                        '/auth/preferences/multifactor_recovery',
                        '/auth/preferences/multifactor_recovery_regen',
                        '/auth/multifactor',
                        '/auth/do_multifactor',
                        ]:
                self.app.post(url,
                              {'password': 'foo', '_session_id': self.app.cookies['_session_id']},
                              status=404)

    def test_user_disabled(self):
        r = self.app.get('/auth/preferences/')
        info_html = str(r.html.find(attrs={'class': 'preferences multifactor'}))
        assert_in('disabled', info_html)

    def test_user_enabled(self):
        self._init_totp()
        r = self.app.get('/auth/preferences/')
        info_html = str(r.html.find(attrs={'class': 'preferences multifactor'}))
        assert_in('enabled', info_html)

    def test_reconfirm_auth(self):
        from datetime import datetime as real_datetime
        with patch('allura.lib.decorators.datetime') as datetime:
            datetime.min = real_datetime.min

            # reconfirm required at first
            datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 0, 0)
            r = self.app.get('/auth/preferences/totp_new')
            assert_in('Password Confirmation', r)

            # submit form, and its not required
            r.form['password'] = 'foo'
            r = r.form.submit()
            assert_not_in('Password Confirmation', r)

            # still not required
            datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 1, 45)
            r = self.app.get('/auth/preferences/totp_new')
            assert_not_in('Password Confirmation', r)

            # required later
            datetime.utcnow.return_value = real_datetime(2016, 1, 1, 0, 2, 3)
            r = self.app.get('/auth/preferences/totp_new')
            assert_in('Password Confirmation', r)

    def test_enable_totp(self):
        # create a separate session, for later use in the test
        other_session = TestController()
        other_session.setUp()
        other_session.app.get('/auth/preferences/')

        with out_audits(user=True):
            r = self.app.get('/auth/preferences/totp_new')
            assert_in('Password Confirmation', r)

        with audits('Visited multifactor new TOTP page', user=True):
            r.form['password'] = 'foo'
            r = r.form.submit()
            assert_in('Scan this barcode', r)

        first_key_shown = r.session['totp_new_key']

        with audits('Failed to set up multifactor TOTP \(wrong code\)', user=True):
            form = r.forms['totp_set']
            form['code'] = ''
            r = form.submit()
            assert_in('Invalid', r)
            assert_equal(first_key_shown, r.session['totp_new_key'])  # different keys on each pageload would be bad!

        new_totp = TotpService().Totp(r.session['totp_new_key'])
        code = new_totp.generate(time_time())
        form = r.forms['totp_set']
        form['code'] = code
        with audits('Set up multifactor TOTP', user=True):
            r = form.submit()
            assert_equal('Two factor authentication has now been set up.', json.loads(self.webflash(r))['message'],
                         self.webflash(r))

        tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all()
        assert_equal(len(tasks), 1)
        assert_equal(tasks[0].kwargs['subject'], 'Two-Factor Authentication Enabled')
        assert_in('new two-factor authentication', tasks[0].kwargs['text'])

        r = r.follow()
        assert_in('Recovery Codes', r)

        # Confirm any pre-existing sessions have to re-authenticate
        r = other_session.app.get('/auth/preferences/')
        assert_in('/auth/?return_to', r.headers['Location'])
        other_session.tearDown()

    def test_reset_totp(self):
        self._init_totp()

        # access page
        r = self.app.get('/auth/preferences/totp_new')
        assert_in('Password Confirmation', r)

        # reconfirm password to get to it
        r.form['password'] = 'foo'
        r = r.form.submit()

        # confirm warning message, and key is not changed yet
        assert_in('Scan this barcode', r)
        assert_in('this will invalidate your previous', r)
        current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin'))
        assert_equal(self.sample_key, current_key)

        # incorrect submission
        form = r.forms['totp_set']
        form['code'] = ''
        r = form.submit()
        assert_in('Invalid', r)

        # still unchanged key
        current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin'))
        assert_equal(self.sample_key, current_key)

        # valid submission
        new_key = r.session['totp_new_key']
        new_totp = TotpService().Totp(new_key)
        code = new_totp.generate(time_time())
        form = r.forms['totp_set']
        form['code'] = code
        r = form.submit()
        assert_equal('Two factor authentication has now been set up.', json.loads(self.webflash(r))['message'],
                     self.webflash(r))

        # new key in place
        current_key = TotpService.get().get_secret_key(M.User.query.get(username='test-admin'))
        assert_equal(new_key, current_key)
        assert_not_equal(self.sample_key, current_key)

    def test_disable(self):
        self._init_totp()

        self.app.get('/auth/preferences/multifactor_disable', status=405)  # GET not allowed

        # get form and submit
        r = self.app.get('/auth/preferences/')
        form = r.forms['multifactor_disable']
        r = form.submit()

        # confirm first, no change
        assert_in('Password Confirmation', r)
        user = M.User.query.get(username='test-admin')
        assert_equal(user.get_pref('multifactor'), True)

        # confirm submit, everything goes off
        r.form['password'] = 'foo'
        with audits('Disabled multifactor TOTP', user=True):
            r = r.form.submit()
            assert_equal('Multifactor authentication has now been disabled.', json.loads(self.webflash(r))['message'],
                         self.webflash(r))
        user = M.User.query.get(username='test-admin')
        assert_equal(user.get_pref('multifactor'), False)
        assert_equal(TotpService().get().get_secret_key(user), None)
        assert_equal(RecoveryCodeService().get().get_codes(user), [])

        # email confirmation
        tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all()
        assert_equal(len(tasks), 1)
        assert_equal(tasks[0].kwargs['subject'], 'Two-Factor Authentication Disabled')
        assert_in('disabled two-factor authentication', tasks[0].kwargs['text'])

    def test_login_totp(self):
        self._init_totp()

        # so test-admin isn't automatically logged in for all requests
        self.app.extra_environ = {'disable_auth_magic': str('True')}

        # regular login
        r = self.app.get('/auth/?return_to=/p/foo')
        encoded = self.app.antispam_field_names(r.form)
        r.form[encoded['username']] = 'test-admin'
        r.form[encoded['password']] = 'foo'
        with audits('Multifactor login - password ok, code not entered yet', user=True):
            r = r.form.submit()

        # check results
        assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r
        r = r.follow()
        assert not r.session.get('username')

        # try an invalid code
        r.form['code'] = 'invalid-code'
        with audits('Multifactor login - invalid code', user=True):
            r = r.form.submit()
        assert_in('Invalid code', r)
        assert not r.session.get('username')

        # use a valid code
        totp = TotpService().Totp(self.sample_key)
        code = totp.generate(time_time())
        r.form['code'] = code
        with audits('Successful login', user=True):
            r = r.form.submit()

        # confirm login and final page
        assert_equal(r.session['username'], 'test-admin')
        assert r.location.endswith('/p/foo'), r

    def test_login_rate_limit(self):
        self._init_totp()

        # so test-admin isn't automatically logged in for all requests
        self.app.extra_environ = {'disable_auth_magic': str('True')}

        # regular login
        r = self.app.get('/auth/?return_to=/p/foo')
        encoded = self.app.antispam_field_names(r.form)

        r.form[encoded['username']] = 'test-admin'
        r.form[encoded['password']] = 'foo'
        r = r.form.submit()
        r = r.follow()

        # try some invalid codes
        for i in range(3):
            r.form['code'] = 'invalid-code'
            r = r.form.submit()
            assert_in('Invalid code', r)

        # use a valid code, but it'll hit rate limit
        totp = TotpService().Totp(self.sample_key)
        code = totp.generate(time_time())
        r.form['code'] = code
        with audits('Multifactor login - rate limit', user=True):
            r = r.form.submit()

        assert_in('rate limit exceeded', r)
        assert not r.session.get('username')

    def test_login_totp_disrupted(self):
        self._init_totp()

        # so test-admin isn't automatically logged in for all requests
        self.app.extra_environ = {'disable_auth_magic': str('True')}

        # regular login
        r = self.app.get('/auth/')
        encoded = self.app.antispam_field_names(r.form)
        r.form[encoded['username']] = 'test-admin'
        r.form[encoded['password']] = 'foo'
        r = r.form.submit()
        r = r.follow()

        # go to some other page instead of filling out the 2FA code
        other_r = self.app.get('/')

        # then try to complete the 2FA form
        totp = TotpService().Totp(self.sample_key)
        code = totp.generate(time_time())
        r.form['code'] = code
        r = r.form.submit()

        # sent back to regular login
        assert_equal('Your multifactor login was disrupted, please start over.',
                     json.loads(self.webflash(r))['message'],
                     self.webflash(r))
        r = r.follow()
        assert_in('Password Login', r)

    def test_login_recovery_code(self):
        self._init_totp()

        # so test-admin isn't automatically logged in for all requests
        self.app.extra_environ = {'disable_auth_magic': str('True')}

        # regular login
        r = self.app.get('/auth/?return_to=/p/foo')
        encoded = self.app.antispam_field_names(r.form)
        r.form[encoded['username']] = 'test-admin'
        r.form[encoded['password']] = 'foo'
        r = r.form.submit()

        # check results
        assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r
        r = r.follow()
        assert not r.session.get('username')

        # change login mode
        r.form['mode'] = 'recovery'

        # try an invalid code
        r.form['code'] = 'invalid-code'
        r = r.form.submit()
        assert_in('Invalid code', r)
        assert not r.session.get('username')

        # use a valid code
        user = M.User.by_username('test-admin')
        recovery = RecoveryCodeService().get()
        recovery.regenerate_codes(user)
        recovery_code = recovery.get_codes(user)[0]
        r.form['code'] = recovery_code
        with audits('Logged in using a multifactor recovery code', user=True):
            r = r.form.submit()

        # confirm login and final page
        assert_equal(r.session['username'], 'test-admin')
        assert r.location.endswith('/p/foo'), r

        # confirm code used up
        assert_not_in(recovery_code, RecoveryCodeService().get().get_codes(user))

    @patch('allura.lib.plugin.AuthenticationProvider.hibp_password_check_enabled', Mock(return_value=True))
    def test_login_totp_with_hibp(self):
        # this is essentially the same as regular TOTP test, just making sure that HIBP doesn't get in the way
        # or cause any problems.  It shouldn't even run since a password isn't present when the final login happens

        self._init_totp()

        # so test-admin isn't automatically logged in for all requests
        self.app.extra_environ = {'disable_auth_magic': str('True')}

        # regular login
        r = self.app.get('/auth/?return_to=/p/foo')
        encoded = self.app.antispam_field_names(r.form)
        r.form[encoded['username']] = 'test-admin'
        r.form[encoded['password']] = 'foo'
        with audits('Multifactor login - password ok, code not entered yet', user=True):
            r = r.form.submit()

        # check results
        assert r.location.endswith('/auth/multifactor?return_to=%2Fp%2Ffoo'), r
        r = r.follow()
        assert not r.session.get('username')

        # use a valid code
        totp = TotpService().Totp(self.sample_key)
        code = totp.generate(time_time())
        r.form['code'] = code
        with audits('Successful login', user=True):
            r = r.form.submit()

        # confirm login and final page
        assert_equal(r.session['username'], 'test-admin')
        assert r.location.endswith('/p/foo'), r

    def test_view_key(self):
        self._init_totp()

        with out_audits(user=True):
            r = self.app.get('/auth/preferences/totp_view')
            assert_in('Password Confirmation', r)

        with audits('Viewed multifactor TOTP config page', user=True):
            r.form['password'] = 'foo'
            r = r.form.submit()
            assert_in('Scan this barcode', r)

    def test_view_recovery_codes_and_regen(self):
        self._init_totp()

        # reconfirm password
        with out_audits(user=True):
            r = self.app.get('/auth/preferences/multifactor_recovery')
            assert_in('Password Confirmation', r)

        # actual visit
        with audits('Viewed multifactor recovery codes', user=True):
            r.form['password'] = 'foo'
            r = r.form.submit()
            assert_in('Download', r)
            assert_in('Print', r)

        # regenerate codes
        with audits('Regenerated multifactor recovery codes', user=True):
            r = r.forms['multifactor_recovery_regen'].submit()

        # email confirmation
        tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all()
        assert_equal(len(tasks), 1)
        assert_equal(tasks[0].kwargs['subject'], 'Two-Factor Recovery Codes Regenerated')
        assert_in('regenerated', tasks[0].kwargs['text'])

    def test_send_links(self):
        r = self.app.get('/auth/preferences/totp_new')
        r.form['password'] = 'foo'
        r = r.form.submit()

        r = r.forms['totp_send_link'].submit()

        tasks = M.MonQTask.query.find(dict(task_name='allura.tasks.mail_tasks.sendsimplemail')).all()
        assert_equal(len(tasks), 1)
        assert_equal(tasks[0].kwargs['subject'], 'Two-Factor Authentication Apps')
        assert_in('itunes.apple.com', tasks[0].kwargs['text'])
        assert_in('play.google.com', tasks[0].kwargs['text'])
