| # -*- coding: utf-8 -*- |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| Model tests for artifact |
| """ |
| import re |
| from datetime import datetime |
| |
| from pylons import tmpl_context as c |
| from nose.tools import assert_raises, assert_equal |
| from nose import with_setup |
| from mock import patch |
| from ming.orm.ormsession import ThreadLocalORMSession |
| from ming.orm import Mapper |
| from bson import ObjectId |
| from webob import Request |
| |
| import allura |
| from allura import model as M |
| from allura.lib import helpers as h |
| from allura.lib import security |
| from allura.tests import decorators as td |
| from allura.websetup.schema import REGISTRY |
| from alluratest.controller import setup_basic_test, setup_unit_test |
| from forgewiki import model as WM |
| |
| |
| class Checkmessage(M.Message): |
| |
| class __mongometa__: |
| name = 'checkmessage' |
| |
| def url(self): |
| return '' |
| |
| def __init__(self, **kw): |
| super(Checkmessage, self).__init__(**kw) |
| if self.slug is not None and self.full_slug is None: |
| self.full_slug = datetime.utcnow().strftime( |
| '%Y%m%d%H%M%S') + ':' + self.slug |
| Mapper.compile_all() |
| |
| |
| def setUp(): |
| setup_basic_test() |
| setup_unit_test() |
| setup_with_tools() |
| |
| |
| @td.with_wiki |
| def setup_with_tools(): |
| h.set_context('test', 'wiki', neighborhood='Projects') |
| Checkmessage.query.remove({}) |
| WM.Page.query.remove({}) |
| WM.PageHistory.query.remove({}) |
| M.Shortlink.query.remove({}) |
| c.user = M.User.query.get(username='test-admin') |
| Checkmessage.project = c.project |
| Checkmessage.app_config = c.app.config |
| |
| |
| def tearDown(): |
| ThreadLocalORMSession.close_all() |
| |
| |
| @with_setup(setUp, tearDown) |
| def test_artifact(): |
| pg = WM.Page(title='TestPage1') |
| assert pg.project == c.project |
| assert pg.project_id == c.project._id |
| assert pg.app.config == c.app.config |
| assert pg.app_config == c.app.config |
| u = M.User.query.get(username='test-user') |
| pr = M.ProjectRole.by_user(u, upsert=True) |
| ThreadLocalORMSession.flush_all() |
| REGISTRY.register(allura.credentials, allura.lib.security.Credentials()) |
| assert not security.has_access(pg, 'delete')(user=u) |
| pg.acl.append(M.ACE.allow(pr._id, 'delete')) |
| ThreadLocalORMSession.flush_all() |
| assert security.has_access(pg, 'delete')(user=u) |
| pg.acl.pop() |
| ThreadLocalORMSession.flush_all() |
| assert not security.has_access(pg, 'delete')(user=u) |
| |
| |
| def test_artifact_index(): |
| pg = WM.Page(title='TestPage1') |
| idx = pg.index() |
| assert 'title' in idx |
| assert 'url_s' in idx |
| assert 'project_id_s' in idx |
| assert 'mount_point_s' in idx |
| assert 'type_s' in idx |
| assert 'id' in idx |
| assert idx['id'] == pg.index_id() |
| assert 'text' in idx |
| assert 'TestPage' in pg.shorthand_id() |
| assert pg.link_text() == pg.shorthand_id() |
| |
| |
| @with_setup(setUp, tearDown) |
| def test_artifactlink(): |
| pg = WM.Page(title='TestPage2') |
| q_shortlink = M.Shortlink.query.find(dict( |
| project_id=c.project._id, |
| app_config_id=c.app.config._id, |
| link=pg.shorthand_id())) |
| assert q_shortlink.count() == 0 |
| |
| ThreadLocalORMSession.flush_all() |
| M.MonQTask.run_ready() |
| ThreadLocalORMSession.flush_all() |
| assert q_shortlink.count() == 1 |
| |
| assert M.Shortlink.lookup('[TestPage2]') |
| assert M.Shortlink.lookup('[wiki:TestPage2]') |
| assert M.Shortlink.lookup('[test:wiki:TestPage2]') |
| assert not M.Shortlink.lookup('[test:wiki:TestPage2:foo]') |
| assert not M.Shortlink.lookup('[Wiki:TestPage2]') |
| assert not M.Shortlink.lookup('[TestPage2_no_such_page]') |
| |
| pg.delete() |
| c.project.uninstall_app('wiki') |
| assert not M.Shortlink.lookup('[wiki:TestPage2]') |
| assert q_shortlink.count() == 0 |
| |
| |
| @with_setup(setUp, tearDown) |
| def test_gen_messageid(): |
| assert re.match(r'[0-9a-zA-Z]*.wiki@test.p.localhost', |
| h.gen_message_id()) |
| |
| |
| @with_setup(setUp, tearDown) |
| def test_gen_messageid_with_id_set(): |
| oid = ObjectId() |
| assert re.match(r'%s.wiki@test.p.localhost' % |
| str(oid), h.gen_message_id(oid)) |
| |
| |
| @with_setup(setUp, tearDown) |
| def test_artifact_messageid(): |
| p = WM.Page(title='T') |
| assert re.match(r'%s.wiki@test.p.localhost' % |
| str(p._id), p.message_id()) |
| |
| |
| @with_setup(setUp, tearDown) |
| def test_versioning(): |
| pg = WM.Page(title='TestPage3') |
| with patch('allura.model.artifact.request', |
| Request.blank('/', remote_addr='1.1.1.1')): |
| pg.commit() |
| ThreadLocalORMSession.flush_all() |
| pg.text = 'Here is some text' |
| pg.commit() |
| ThreadLocalORMSession.flush_all() |
| ss = pg.get_version(1) |
| assert ss.author.logged_ip == '1.1.1.1' |
| assert ss.index()['is_history_b'] |
| assert ss.shorthand_id() == pg.shorthand_id() + '#1' |
| assert ss.title == pg.title |
| assert ss.text != pg.text |
| ss = pg.get_version(-1) |
| assert ss.index()['is_history_b'] |
| assert ss.shorthand_id() == pg.shorthand_id() + '#2' |
| assert ss.title == pg.title |
| assert ss.text == pg.text |
| assert_raises(IndexError, pg.get_version, 42) |
| pg.revert(1) |
| pg.commit() |
| ThreadLocalORMSession.flush_all() |
| assert ss.text != pg.text |
| assert pg.history().count() == 3 |
| |
| |
| @with_setup(setUp, tearDown) |
| def test_messages_unknown_lookup(): |
| from bson import ObjectId |
| m = Checkmessage() |
| m.author_id = ObjectId() # something new |
| assert type(m.author()) == M.User, type(m.author()) |
| assert m.author() == M.User.anonymous() |
| |
| |
| @with_setup(setUp, tearDown) |
| @patch('allura.model.artifact.datetime') |
| def test_last_updated(_datetime): |
| c.project.last_updated = datetime(2014, 1, 1) |
| _datetime.utcnow.return_value = datetime(2014, 1, 2) |
| WM.Page(title='TestPage1') |
| ThreadLocalORMSession.flush_all() |
| assert_equal(c.project.last_updated, datetime(2014, 1, 2)) |
| |
| |
| @with_setup(setUp, tearDown) |
| @patch('allura.model.artifact.datetime') |
| def test_last_updated_disabled(_datetime): |
| c.project.last_updated = datetime(2014, 1, 1) |
| _datetime.utcnow.return_value = datetime(2014, 1, 2) |
| try: |
| M.artifact_orm_session._get().skip_last_updated = True |
| WM.Page(title='TestPage1') |
| ThreadLocalORMSession.flush_all() |
| assert_equal(c.project.last_updated, datetime(2014, 1, 1)) |
| finally: |
| M.artifact_orm_session._get().skip_last_updated = False |
| |
| |
| @with_setup(setUp, tearDown) |
| def test_get_discussion_thread_dupe(): |
| artif = WM.Page(title='TestSomeArtifact') |
| thr1 = artif.get_discussion_thread()[0] |
| thr1.post('thr1-post1') |
| thr1.post('thr1-post2') |
| thr2 = M.Thread.new(ref_id=thr1.ref_id) |
| thr2.post('thr2-post1') |
| thr2.post('thr2-post2') |
| thr2.post('thr2-post3') |
| thr3 = M.Thread.new(ref_id=thr1.ref_id) |
| thr3.post('thr3-post1') |
| thr4 = M.Thread.new(ref_id=thr1.ref_id) |
| |
| thread_q = M.Thread.query.find(dict(ref_id=artif.index_id())) |
| assert_equal(thread_q.count(), 4) |
| |
| thread = artif.get_discussion_thread()[0] # force cleanup |
| threads = thread_q.all() |
| assert_equal(len(threads), 1) |
| assert_equal(len(thread.posts), 6) |
| assert not any(p.deleted for p in thread.posts) # normal thread deletion propagates to children, make sure that doesn't happen |