blob: 92ccaf569704cfe12fb09bf21772f5f1342fa91a [file] [log] [blame]
# coding: utf-8
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import shutil
import unittest
import pkg_resources
from itertools import count, product
from datetime import datetime
from zipfile import ZipFile
from collections import defaultdict
from pylons import tmpl_context as c, app_globals as g
import mock
from nose.tools import assert_equal
from datadiff.tools import assert_equals
import tg
import ming
from ming.base import Object
from ming.orm import session, ThreadLocalORMSession
from testfixtures import TempDirectory
from IPython.testing.decorators import onlyif
from alluratest.controller import setup_basic_test, setup_global_objects
from allura import model as M
from allura.model.repo_refresh import send_notifications
from allura.lib import helpers as h
from allura.webhooks import RepoPushWebhookSender
from allura.tests.model.test_repo import RepoImplTestBase
from forgesvn import model as SM
from forgesvn.model.svn import svn_path_exists
from forgesvn.tests import with_svn
from allura.tests.decorators import with_tool
class TestNewRepo(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@with_svn
def setup_with_tools(self):
setup_global_objects()
h.set_context('test', 'src', neighborhood='Projects')
repo_dir = pkg_resources.resource_filename(
'forgesvn', 'tests/data/')
c.app.repo.name = 'testsvn'
c.app.repo.fs_path = repo_dir
self.repo = c.app.repo
self.repo.refresh()
self.rev = self.repo.commit('HEAD')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def test_last_commit_for(self):
tree = self.rev.tree
for row in tree.ls():
assert row['last_commit']['author'] is not None
def test_commit(self):
assert self.rev.primary() is self.rev
assert self.rev.index_id().startswith('allura/model/repo/Commit#')
self.rev.author_url
self.rev.committer_url
assert self.rev.tree._id == self.rev.tree_id
assert self.rev.shorthand_id() == '[r6]'
assert self.rev.symbolic_ids == ([], [])
assert self.rev.url() == (
'/p/test/src/6/')
all_cis = list(self.repo.log(self.rev._id))
assert len(all_cis) == 6
self.rev.tree.ls()
assert self.rev.tree.readme() == (
'README', 'This is readme\nAnother Line\n')
assert self.rev.tree.path() == '/'
assert self.rev.tree.url() == (
'/p/test/src/6/tree/')
self.rev.tree.by_name['README']
assert self.rev.tree.is_blob('README') == True
assert self.rev.tree['a']['b']['c'].ls() == []
self.assertRaises(KeyError, lambda: self.rev.tree['a']['b']['d'])
assert_equal(self.rev.authored_user, None)
assert_equal(self.rev.committed_user, None)
assert_equal(
sorted(self.rev.webhook_info.keys()),
sorted(['id', 'url', 'timestamp', 'message', 'author',
'committer', 'added', 'removed', 'modified', 'copied']))
class TestSVNRepo(unittest.TestCase, RepoImplTestBase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@with_svn
@with_tool('test', 'SVN', 'svn-tags', 'SVN with tags')
def setup_with_tools(self):
setup_global_objects()
repo_dir = pkg_resources.resource_filename(
'forgesvn', 'tests/data/')
with h.push_context('test', 'src', neighborhood='Projects'):
c.app.repo.name = 'testsvn'
c.app.repo.fs_path = repo_dir
self.repo = c.app.repo
self.repo.refresh()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
with h.push_context('test', 'svn-tags', neighborhood='Projects'):
c.app.repo.name = 'testsvn-trunk-tags-branches'
c.app.repo.fs_path = repo_dir
self.svn_tags = c.app.repo
self.svn_tags.refresh()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
h.set_context('test', 'src', neighborhood='Projects')
def test_init(self):
repo = SM.Repository(
name='testsvn',
fs_path=g.tmpdir + '/',
url_path='/test/',
tool='svn',
status='creating')
dirname = os.path.join(repo.fs_path, repo.name)
if os.path.exists(dirname):
shutil.rmtree(dirname)
repo.init()
shutil.rmtree(dirname)
def test_fork(self):
repo = SM.Repository(
name='testsvn',
fs_path=g.tmpdir + '/',
url_path='/test/',
tool='svn',
status='creating')
repo_path = pkg_resources.resource_filename(
'forgesvn', 'tests/data/testsvn')
dirname = os.path.join(repo.fs_path, repo.name)
if os.path.exists(dirname):
shutil.rmtree(dirname)
repo.init()
repo._impl.clone_from('file://' + repo_path)
assert not os.path.exists(
os.path.join(g.tmpdir, 'testsvn/hooks/pre-revprop-change'))
assert os.path.exists(
os.path.join(g.tmpdir, 'testsvn/hooks/post-commit'))
assert os.access(
os.path.join(g.tmpdir, 'testsvn/hooks/post-commit'), os.X_OK)
with open(os.path.join(g.tmpdir, 'testsvn/hooks/post-commit')) as f:
hook_data = f.read()
self.assertIn(
'curl -s http://localhost/auth/refresh_repo/p/test/src/\n',
hook_data)
self.assertIn('exec $DIR/post-commit-user "$@"\n', hook_data)
repo.refresh(notify=False)
assert len(list(repo.log()))
shutil.rmtree(dirname)
@mock.patch('forgesvn.model.svn.tg')
def test_can_hotcopy(self, tg):
from forgesvn.model.svn import SVNImplementation
func = SVNImplementation.can_hotcopy
obj = mock.Mock(spec=SVNImplementation)
for combo in product(
['file:///myfile', 'http://myfile'],
[True, False],
['version 1.7', 'version 1.6', 'version 2.0.3']):
source_url = combo[0]
tg.config = {'scm.svn.hotcopy': combo[1]}
stdout = combo[2]
obj.check_call.return_value = stdout, '', 0
expected = (source_url.startswith('file://') and
tg.config['scm.svn.hotcopy'] and
stdout != 'version 1.6')
result = func(obj, source_url)
assert result == expected
@mock.patch('forgesvn.model.svn.g.post_event')
def test_clone(self, post_event):
repo = SM.Repository(
name='testsvn',
fs_path=g.tmpdir + '/',
url_path='/test/',
tool='svn',
status='creating')
repo_path = pkg_resources.resource_filename(
'forgesvn', 'tests/data/testsvn')
dirname = os.path.join(repo.fs_path, repo.name)
if os.path.exists(dirname):
shutil.rmtree(dirname)
repo.init()
repo._impl.clone_from('file://' + repo_path)
assert not os.path.exists(
os.path.join(g.tmpdir, 'testsvn/hooks/pre-revprop-change'))
assert os.path.exists(
os.path.join(g.tmpdir, 'testsvn/hooks/post-commit'))
assert os.access(
os.path.join(g.tmpdir, 'testsvn/hooks/post-commit'), os.X_OK)
with open(os.path.join(g.tmpdir, 'testsvn/hooks/post-commit')) as f:
c = f.read()
self.assertIn(
'curl -s http://localhost/auth/refresh_repo/p/test/src/\n', c)
self.assertIn('exec $DIR/post-commit-user "$@"\n', c)
repo.refresh(notify=False)
assert len(list(repo.log()))
shutil.rmtree(dirname)
def test_index(self):
i = self.repo.index()
assert i['type_s'] == 'SVN Repository', i
def test_log_id_only(self):
entries = list(self.repo.log(id_only=True))
assert_equal(entries, [6, 5, 4, 3, 2, 1])
def test_log(self):
entries = list(self.repo.log(id_only=False))
assert_equal(entries, [
{'parents': [5],
'refs': ['HEAD'],
'committed': {
'date': datetime(2013, 11, 8, 13, 38, 11, 152821),
'name': u'coldmind', 'email': ''},
'message': u'',
'rename_details': {},
'id': 6,
'authored': {
'date': datetime(2013, 11, 8, 13, 38, 11, 152821),
'name': u'coldmind',
'email': ''
}, 'size': None},
{'parents': [4],
'refs': [],
'committed': {
'date': datetime(2010, 11, 18, 20, 14, 21, 515743),
'name': u'rick446',
'email': ''},
'message': u'Copied a => b',
'rename_details': {},
'id': 5,
'authored': {
'date': datetime(2010, 11, 18, 20, 14, 21, 515743),
'name': u'rick446',
'email': ''},
'size': None},
{'parents': [3],
'refs': [],
'committed': {
'date': datetime(2010, 10, 8, 15, 32, 59, 383719),
'name': u'rick446',
'email': ''},
'message': u'Remove hello.txt',
'rename_details': {},
'id': 4,
'authored': {
'date': datetime(2010, 10, 8, 15, 32, 59, 383719),
'name': u'rick446',
'email': ''},
'size': None},
{'parents': [2],
'refs': [],
'committed': {
'date': datetime(2010, 10, 8, 15, 32, 48, 272296),
'name': u'rick446',
'email': ''},
'message': u'Modify readme',
'rename_details': {},
'id': 3,
'authored':
{'date': datetime(2010, 10, 8, 15, 32, 48, 272296),
'name': u'rick446',
'email': ''},
'size': None},
{'parents': [1],
'refs': [],
'committed': {
'date': datetime(2010, 10, 8, 15, 32, 36, 221863),
'name': u'rick446',
'email': ''},
'message': u'Add path',
'rename_details': {},
'id': 2,
'authored': {
'date': datetime(2010, 10, 8, 15, 32, 36, 221863),
'name': u'rick446',
'email': ''},
'size': None},
{'parents': [],
'refs': [],
'committed': {
'date': datetime(2010, 10, 8, 15, 32, 7, 238375),
'name': u'rick446',
'email': ''},
'message': u'Create readme',
'rename_details': {},
'id': 1,
'authored': {
'date': datetime(2010, 10, 8, 15, 32, 7, 238375),
'name': u'rick446',
'email': ''},
'size': None}])
def test_log_file(self):
entries = list(self.repo.log(path='/README', id_only=False))
assert_equal(entries, [
{'authored': {'date': datetime(2010, 10, 8, 15, 32, 48, 272296),
'email': '',
'name': u'rick446'},
'committed': {'date': datetime(2010, 10, 8, 15, 32, 48, 272296),
'email': '',
'name': u'rick446'},
'id': 3,
'message': u'Modify readme',
'parents': [2],
'refs': [],
'size': 28,
'rename_details': {}},
{'authored': {'date': datetime(2010, 10, 8, 15, 32, 7, 238375),
'email': '',
'name': u'rick446'},
'committed': {'date': datetime(2010, 10, 8, 15, 32, 7, 238375),
'email': '',
'name': u'rick446'},
'id': 1,
'message': u'Create readme',
'parents': [],
'refs': [],
'size': 15,
'rename_details': {}},
])
def test_is_file(self):
assert self.repo.is_file('/README')
assert not self.repo.is_file('/a')
def test_paged_diffs(self):
entry = self.repo.commit(self.repo.log(2, id_only=True).next())
self.assertEqual(entry.diffs, entry.paged_diffs())
self.assertEqual(entry.diffs, entry.paged_diffs(start=0))
added_expected = entry.diffs.added[1:3]
expected = dict(
copied=[], changed=[], removed=[],
added=added_expected, total=4)
actual = entry.paged_diffs(start=1, end=3)
self.assertEqual(expected, actual)
fake_id = self.repo._impl._oid(100)
empty = M.repository.Commit(_id=fake_id, repo=self.repo).paged_diffs()
self.assertEqual(sorted(actual.keys()), sorted(empty.keys()))
def test_diff_create_file(self):
entry = self.repo.commit(self.repo.log(1, id_only=True).next())
self.assertEqual(
entry.diffs, dict(
copied=[], changed=[],
removed=[], added=['/README'], total=1))
def test_diff_create_path(self):
entry = self.repo.commit(self.repo.log(2, id_only=True).next())
actual = entry.diffs
actual.added = sorted(actual.added)
self.assertEqual(
entry.diffs, dict(
copied=[], changed=[], removed=[],
added=sorted([
'/a', '/a/b', '/a/b/c',
'/a/b/c/hello.txt']), total=4))
def test_diff_modify_file(self):
entry = self.repo.commit(self.repo.log(3, id_only=True).next())
self.assertEqual(
entry.diffs, dict(
copied=[], changed=['/README'],
removed=[], added=[], total=1))
def test_diff_delete(self):
entry = self.repo.commit(self.repo.log(4, id_only=True).next())
self.assertEqual(
entry.diffs, dict(
copied=[], changed=[],
removed=['/a/b/c/hello.txt'], added=[], total=1))
def test_diff_copy(self):
entry = self.repo.commit(self.repo.log(5, id_only=True).next())
assert_equals(dict(entry.diffs), dict(
copied=[{'new': u'/b', 'old': u'/a', 'diff': '', 'ratio': 1}],
changed=[], removed=[], added=[], total=1))
def test_commit(self):
entry = self.repo.commit(1)
assert entry.committed.name == 'rick446'
assert entry.message
def test_svn_path_exists(self):
repo_path = pkg_resources.resource_filename(
'forgesvn', 'tests/data/testsvn')
assert svn_path_exists("file://%s/a" % repo_path)
assert svn_path_exists("file://%s" % repo_path)
assert not svn_path_exists("file://%s/badpath" % repo_path)
with mock.patch('forgesvn.model.svn.pysvn') as pysvn:
svn_path_exists('dummy')
pysvn.Client.return_value.info2.assert_called_once_with(
'dummy',
revision=pysvn.Revision.return_value,
recurse=False)
@onlyif(os.path.exists(tg.config.get('scm.repos.tarball.zip_binary', '/usr/bin/zip')), 'zip binary is missing')
def test_tarball(self):
tmpdir = tg.config['scm.repos.tarball.root']
assert_equal(self.repo.tarball_path,
os.path.join(tmpdir, 'svn/t/te/test/testsvn'))
assert_equal(self.repo.tarball_url('1'),
'file:///svn/t/te/test/testsvn/test-src-1.zip')
self.repo.tarball('1')
assert os.path.isfile(
os.path.join(tmpdir, "svn/t/te/test/testsvn/test-src-1.zip"))
tarball_zip = ZipFile(
os.path.join(tmpdir, 'svn/t/te/test/testsvn/test-src-1.zip'), 'r')
assert_equal(tarball_zip.namelist(),
['test-src-1/', 'test-src-1/README'])
shutil.rmtree(self.repo.tarball_path.encode('utf-8'),
ignore_errors=True)
@onlyif(os.path.exists(tg.config.get('scm.repos.tarball.zip_binary', '/usr/bin/zip')), 'zip binary is missing')
def test_tarball_aware_of_tags(self):
rev = '19'
tag_content = sorted(['test-svn-tags-19-tags-tag-1.0/',
'test-svn-tags-19-tags-tag-1.0/svn-commit.tmp',
'test-svn-tags-19-tags-tag-1.0/README'])
h.set_context('test', 'svn-tags', neighborhood='Projects')
tmpdir = tg.config['scm.repos.tarball.root']
tarball_path = os.path.join(
tmpdir, 'svn/t/te/test/testsvn-trunk-tags-branches/')
fn = tarball_path + 'test-svn-tags-19-tags-tag-1.0.zip'
self.svn_tags.tarball(rev, '/tags/tag-1.0/')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()), tag_content)
os.remove(fn)
self.svn_tags.tarball(rev, '/tags/tag-1.0/some/path/')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()), tag_content)
os.remove(fn)
# if inside of tags, but no tag is specified
# expect snapshot of trunk
fn = tarball_path + 'test-svn-tags-19-trunk.zip'
self.svn_tags.tarball(rev, '/tags/')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()),
sorted(['test-svn-tags-19-trunk/',
'test-svn-tags-19-trunk/aaa.txt',
'test-svn-tags-19-trunk/bbb.txt',
'test-svn-tags-19-trunk/ccc.txt',
'test-svn-tags-19-trunk/README']))
shutil.rmtree(tarball_path, ignore_errors=True)
@onlyif(os.path.exists(tg.config.get('scm.repos.tarball.zip_binary', '/usr/bin/zip')), 'zip binary is missing')
def test_tarball_aware_of_branches(self):
rev = '19'
branch_content = sorted(['test-svn-tags-19-branches-aaa/',
'test-svn-tags-19-branches-aaa/aaa.txt',
'test-svn-tags-19-branches-aaa/svn-commit.tmp',
'test-svn-tags-19-branches-aaa/README'])
h.set_context('test', 'svn-tags', neighborhood='Projects')
tmpdir = tg.config['scm.repos.tarball.root']
tarball_path = os.path.join(
tmpdir, 'svn/t/te/test/testsvn-trunk-tags-branches/')
fn = tarball_path + 'test-svn-tags-19-branches-aaa.zip'
self.svn_tags.tarball(rev, '/branches/aaa/')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()), branch_content)
os.remove(fn)
self.svn_tags.tarball(rev, '/branches/aaa/some/path/')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()), branch_content)
os.remove(fn)
# if inside of branches, but no branch is specified
# expect snapshot of trunk
fn = tarball_path + 'test-svn-tags-19-trunk.zip'
self.svn_tags.tarball(rev, '/branches/')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()),
sorted(['test-svn-tags-19-trunk/',
'test-svn-tags-19-trunk/aaa.txt',
'test-svn-tags-19-trunk/bbb.txt',
'test-svn-tags-19-trunk/ccc.txt',
'test-svn-tags-19-trunk/README']))
shutil.rmtree(tarball_path, ignore_errors=True)
@onlyif(os.path.exists(tg.config.get('scm.repos.tarball.zip_binary', '/usr/bin/zip')), 'zip binary is missing')
def test_tarball_aware_of_trunk(self):
rev = '19'
trunk_content = sorted(['test-svn-tags-19-trunk/',
'test-svn-tags-19-trunk/aaa.txt',
'test-svn-tags-19-trunk/bbb.txt',
'test-svn-tags-19-trunk/ccc.txt',
'test-svn-tags-19-trunk/README'])
h.set_context('test', 'svn-tags', neighborhood='Projects')
tmpdir = tg.config['scm.repos.tarball.root']
tarball_path = os.path.join(
tmpdir, 'svn/t/te/test/testsvn-trunk-tags-branches/')
fn = tarball_path + 'test-svn-tags-19-trunk.zip'
self.svn_tags.tarball(rev, '/trunk/')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()), trunk_content)
os.remove(fn)
self.svn_tags.tarball(rev, '/trunk/some/path/')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()), trunk_content)
os.remove(fn)
# no path, but there are trunk in the repo
# expect snapshot of trunk
self.svn_tags.tarball(rev)
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(sorted(snapshot.namelist()), trunk_content)
os.remove(fn)
# no path, and no trunk dir
# expect snapshot of repo root
h.set_context('test', 'src', neighborhood='Projects')
fn = os.path.join(tmpdir, 'svn/t/te/test/testsvn/test-src-1.zip')
self.repo.tarball('1')
assert os.path.isfile(fn), fn
snapshot = ZipFile(fn, 'r')
assert_equal(snapshot.namelist(), ['test-src-1/', 'test-src-1/README'])
shutil.rmtree(os.path.join(tmpdir, 'svn/t/te/test/testsvn/'),
ignore_errors=True)
shutil.rmtree(tarball_path, ignore_errors=True)
def test_is_empty(self):
assert not self.repo.is_empty()
with TempDirectory() as d:
repo2 = SM.Repository(
name='test',
fs_path=d.path,
url_path='/test/',
tool='svn',
status='creating')
repo2.init()
assert repo2.is_empty()
repo2.refresh()
ThreadLocalORMSession.flush_all()
assert repo2.is_empty()
def test_webhook_payload(self):
sender = RepoPushWebhookSender()
cids = list(self.repo.all_commit_ids())[:2]
payload = sender.get_payload(commit_ids=cids)
expected_payload = {
'size': 2,
'after': 'r6',
'before': 'r4',
'commits': [{
'id': u'r6',
'url': u'http://localhost/p/test/src/6/',
'timestamp': datetime(2013, 11, 8, 13, 38, 11, 152000),
'message': u'',
'author': {'name': u'coldmind',
'email': u'',
'username': u''},
'committer': {'name': u'coldmind',
'email': u'',
'username': u''},
'added': [u'/ЗРЯЧИЙ_ТА_ПОБАЧИТЬ'],
'removed': [],
'modified': [],
'copied': []
}, {
'id': u'r5',
'url': u'http://localhost/p/test/src/5/',
'timestamp': datetime(2010, 11, 18, 20, 14, 21, 515000),
'message': u'Copied a => b',
'author': {'name': u'rick446',
'email': u'',
'username': u''},
'committer': {'name': u'rick446',
'email': u'',
'username': u''},
'added': [],
'removed': [],
'modified': [],
'copied': [
{'new': u'/b', 'old': u'/a', 'diff': '', 'ratio': 1},
],
}],
'repository': {
'name': u'SVN',
'full_name': u'/p/test/src/',
'url': u'http://localhost/p/test/src/',
},
}
assert_equals(payload, expected_payload)
class TestSVNRev(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@with_svn
def setup_with_tools(self):
setup_global_objects()
h.set_context('test', 'src', neighborhood='Projects')
repo_dir = pkg_resources.resource_filename(
'forgesvn', 'tests/data/')
c.app.repo.name = 'testsvn'
c.app.repo.fs_path = repo_dir
self.repo = c.app.repo
self.repo.refresh()
self.rev = self.repo.commit(1)
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def test_url(self):
assert self.rev.url().endswith('/1/')
def test_primary(self):
assert self.rev.primary() == self.rev
def test_shorthand(self):
assert self.rev.shorthand_id() == '[r1]'
def test_diff(self):
diffs = (self.rev.diffs.added
+ self.rev.diffs.removed
+ self.rev.diffs.changed
+ self.rev.diffs.copied)
for d in diffs:
print d
def _oid(self, rev_id):
return '%s:%s' % (self.repo._id, rev_id)
def test_log(self):
# path only
commits = list(self.repo.log(self.repo.head, id_only=True))
assert_equal(commits, [6, 5, 4, 3, 2, 1])
commits = list(self.repo.log(self.repo.head, 'README', id_only=True))
assert_equal(commits, [3, 1])
commits = list(self.repo.log(1, 'README', id_only=True))
assert_equal(commits, [1])
commits = list(self.repo.log(self.repo.head, 'a/b/c/', id_only=True))
assert_equal(commits, [4, 2])
commits = list(self.repo.log(3, 'a/b/c/', id_only=True))
assert_equal(commits, [2])
assert_equal(
list(self.repo.log(self.repo.head, 'does/not/exist', id_only=True)), [])
def test_notification_email(self):
setup_global_objects()
h.set_context('test', 'src', neighborhood='Projects')
repo_dir = pkg_resources.resource_filename(
'forgesvn', 'tests/data/')
self.repo = SM.Repository(
name='testsvn',
fs_path=repo_dir,
url_path='/test/',
tool='svn',
status='creating')
self.repo.refresh()
ThreadLocalORMSession.flush_all()
send_notifications(self.repo, [self.repo.rev_to_commit_id(1)])
ThreadLocalORMSession.flush_all()
n = M.Notification.query.find(
dict(subject='[test:src] [r1] - rick446: Create readme')).first()
assert n
assert_equal(n.text, 'Create readme http://localhost/p/test/src/1/')
class _Test(unittest.TestCase):
idgen = ('obj_%d' % i for i in count())
def _make_tree(self, object_id, **kwargs):
t, isnew = M.repository.Tree.upsert(object_id)
repo = getattr(self, 'repo', None)
t.repo = repo
for k, v in kwargs.iteritems():
if isinstance(v, basestring):
obj = M.repository.Blob(
t, k, self.idgen.next())
t.blob_ids.append(Object(
name=k, id=obj._id))
else:
obj = self._make_tree(self.idgen.next(), **v)
t.tree_ids.append(Object(
name=k, id=obj._id))
session(t).flush()
return t
def _make_commit(self, object_id, **tree_parts):
ci, isnew = M.repository.Commit.upsert(object_id)
if isnew:
ci.committed.email = c.user.email_addresses[0]
ci.authored.email = c.user.email_addresses[0]
dt = datetime.utcnow()
# BSON datetime resolution is to 1 millisecond, not 1 microsecond
# like Python. Round this now so it'll match the value that's
# pulled from MongoDB in the tests.
ci.authored.date = dt.replace(
microsecond=dt.microsecond / 1000 * 1000)
ci.message = 'summary\n\nddescription'
ci.set_context(self.repo)
ci.tree_id = 't_' + object_id
ci.tree = self._make_tree(ci.tree_id, **tree_parts)
return ci, isnew
def _make_log(self, ci):
session(ci).flush(ci)
rb = M.repo_refresh.CommitRunBuilder([ci._id])
rb.run()
rb.cleanup()
def setUp(self):
setup_basic_test()
setup_global_objects()
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
self.prefix = tg.config.get('scm.repos.root', '/')
class _TestWithRepo(_Test):
def setUp(self):
super(_TestWithRepo, self).setUp()
h.set_context('test', neighborhood='Projects')
c.project.install_app('svn', 'test1')
h.set_context('test', 'test1', neighborhood='Projects')
self.repo = M.Repository(name='test1', tool='svn')
self.repo._impl = mock.Mock(spec=M.RepositoryImplementation())
self.repo._impl.shorthand_for_commit = M.RepositoryImplementation.shorthand_for_commit
self.repo._impl.url_for_commit = (
lambda *a, **kw: M.RepositoryImplementation.url_for_commit(
self.repo._impl, *a, **kw))
self.repo._impl._repo = self.repo
self.repo._impl.all_commit_ids = lambda *a, **kw: []
self.repo._impl.commit().symbolic_ids = None
ThreadLocalORMSession.flush_all()
class _TestWithRepoAndCommit(_TestWithRepo):
def setUp(self):
super(_TestWithRepoAndCommit, self).setUp()
self.ci, isnew = self._make_commit('foo')
ThreadLocalORMSession.flush_all()
# ThreadLocalORMSession.close_all()
class TestRepo(_TestWithRepo):
def test_create(self):
assert self.repo.fs_path == os.path.join(self.prefix, 'svn/p/test/')
assert self.repo.url_path == '/p/test/'
assert self.repo.full_fs_path == os.path.join(
self.prefix, 'svn/p/test/test1')
def test_passthrough(self):
argless = ['init']
for fn in argless:
getattr(self.repo, fn)()
getattr(self.repo._impl, fn).assert_called_with()
unary = ['commit', 'open_blob']
for fn in unary:
getattr(self.repo, fn)('foo')
getattr(self.repo._impl, fn).assert_called_with('foo')
def test_shorthand_for_commit(self):
self.assertEqual(
self.repo.shorthand_for_commit('a' * 40),
'[aaaaaa]')
def test_url_for_commit(self):
self.assertEqual(
self.repo.url_for_commit('a' * 40),
'/p/test/test1/ci/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/')
@mock.patch('allura.model.repository.g.post_event')
def test_init_as_clone(self, post_event):
self.repo.init_as_clone('srcpath', 'srcname', 'srcurl')
assert self.repo.upstream_repo.name == 'srcname'
assert self.repo.upstream_repo.url == 'srcurl'
assert self.repo._impl.clone_from.called_with('srcpath')
post_event.assert_called_once_with('repo_cloned', 'srcurl', 'srcpath')
def test_latest(self):
ci = mock.Mock()
self.repo._impl.commit = mock.Mock(return_value=ci)
assert self.repo.latest() is ci
def test_index(self):
i = self.repo.index()
assert i['type_s'] == 'Repository', i
assert i['name_s'] == 'test1', i
def test_scm_host_url(self):
assert (
self.repo.clone_url('rw', 'nobody')
== 'svn+ssh://nobody@localhost:8022/scm-repo/p/test/test1/'),\
self.repo.clone_url('rw', 'nobody')
assert (
self.repo.clone_url('https', 'nobody')
== 'https://nobody@localhost:8022/scm-repo/p/test/test1/'),\
self.repo.clone_url('https', 'nobody')
with h.push_config(self.repo.app.config.options, external_checkout_url='https://$username@foo.com/'):
assert_equal(
self.repo.clone_url('https', 'user'),
'https://user@foo.com/')
def test_merge_request(self):
M.MergeRequest.upsert(app_config_id=c.app.config._id, status='open')
M.MergeRequest.upsert(app_config_id=c.app.config._id, status='closed')
session(M.MergeRequest).flush()
session(M.MergeRequest).clear()
assert self.repo.merge_requests_by_statuses('open').count() == 1
assert self.repo.merge_requests_by_statuses('closed').count() == 1
assert self.repo.merge_requests_by_statuses(
'open', 'closed').count() == 2
def test_guess_type(self):
assert self.repo.guess_type('foo.txt') == ('text/plain', None)
assert self.repo.guess_type('foo.gbaer') == (
'application/octet-stream', None)
assert self.repo.guess_type('foo.html') == ('text/html', None)
assert self.repo.guess_type('.gitignore') == ('text/plain', None)
def test_refresh(self):
committer_name = 'Test Committer'
committer_email = 'test@example.com'
ci = mock.Mock()
ci.authored.name = committer_name
ci.committed.name = committer_name
ci.committed.email = committer_email
ci.author_url = '/u/test-committer/'
ci.activity_name = '[deadbeef]'
ci.activity_url = 'url'
ci.activity_extras = {}
del ci.node_id
self.repo._impl.commit = mock.Mock(return_value=ci)
self.repo._impl.new_commits = mock.Mock(
return_value=['foo%d' % i for i in range(100)])
self.repo._impl.all_commit_ids = mock.Mock(
return_value=['foo%d' % i for i in range(100)])
self.repo.symbolics_for_commit = mock.Mock(
return_value=[['master', 'branch'], []])
def refresh_commit_info(oid, seen, lazy=False):
M.repository.CommitDoc(dict(
authored=dict(
name=committer_name,
email=committer_email),
_id=oid)).m.insert()
self.repo._impl.refresh_commit_info = refresh_commit_info
_id = lambda oid: getattr(oid, '_id', str(oid))
self.repo.shorthand_for_commit = lambda oid: '[' + _id(oid) + ']'
self.repo.url_for_commit = lambda oid: '/ci/' + _id(oid) + '/'
self.repo.refresh()
ThreadLocalORMSession.flush_all()
notifications = M.Notification.query.find().all()
for n in notifications:
if '100 new commits' in n.subject:
assert "master,branch: by %s http://localhost/ci/foo99" % committer_name in n.text
break
else:
assert False, 'Did not find notification'
assert M.Feed.query.find(dict(
author_name=committer_name)).count() == 100
def test_refresh_private(self):
ci = mock.Mock()
self.repo._impl.commit = mock.Mock(return_value=ci)
self.repo._impl.new_commits = mock.Mock(
return_value=['foo%d' % i for i in range(100)])
# make unreadable by *anonymous, so additional notification logic
# executes
self.repo.acl = []
c.project.acl = []
self.repo.refresh()
def test_push_upstream_context(self):
self.repo.init_as_clone('srcpath', '/p/test/svn/', '/p/test/svn/')
old_app_instance = M.Project.app_instance
try:
M.Project.app_instance = mock.Mock(return_value=ming.base.Object(
config=ming.base.Object(_id=None)))
with self.repo.push_upstream_context():
assert c.project.shortname == 'test'
finally:
M.Project.app_instance = old_app_instance
def test_pending_upstream_merges(self):
self.repo.init_as_clone('srcpath', '/p/test/svn/', '/p/test/svn/')
old_app_instance = M.Project.app_instance
try:
M.Project.app_instance = mock.Mock(return_value=ming.base.Object(
config=ming.base.Object(_id=None)))
self.repo.pending_upstream_merges()
finally:
M.Project.app_instance = old_app_instance
class TestMergeRequest(_TestWithRepoAndCommit):
def setUp(self):
super(TestMergeRequest, self).setUp()
c.project.install_app('svn', 'test2')
h.set_context('test', 'test2', neighborhood='Projects')
self.repo2 = M.Repository(name='test2', tool='svn')
self.repo2._impl = mock.Mock(spec=M.RepositoryImplementation())
self.repo2._impl.log = lambda *a, **kw: (['foo'], [])
self.repo2._impl.all_commit_ids = lambda *a, **kw: []
self.repo2._impl._repo = self.repo2
self.repo2.init_as_clone('/p/test/', 'test1', '/p/test/test1/')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def test_upsert(self):
h.set_context('test', 'test1', neighborhood='Projects')
mr = M.MergeRequest.upsert(
downstream=ming.base.Object(
project_id=c.project._id,
mount_point='test2',
commit_id='foo:2'),
target_branch='foobranch',
summary='summary',
description='description')
u = M.User.by_username('test-admin')
assert_equal(mr.creator, u)
assert_equal(mr.creator_name, u.get_pref('display_name'))
assert_equal(mr.creator_url, u.url())
assert_equal(mr.downstream_url, '/p/test/test2/')
assert_equal(mr.downstream_repo_url,
'http://svn.localhost/p/test/test2/')
with mock.patch('forgesvn.model.svn.SVNLibWrapper') as _svn,\
mock.patch('forgesvn.model.svn.SVNImplementation._map_log') as _map_log:
mr.app.repo._impl.head = 1
_svn().log.return_value = [mock.Mock(revision=mock.Mock(number=2))]
_map_log.return_value = 'bar'
assert_equal(mr.commits, ['bar'])
# can't do assert_called_once_with because pysvn.Revision doesn't
# compare nicely
assert_equal(_svn().log.call_count, 1)
assert_equal(_svn().log.call_args[0],
('file:///tmp/svn/p/test/test2',))
assert_equal(_svn().log.call_args[1]['revision_start'].number, 2)
assert_equal(_svn().log.call_args[1]['limit'], 25)
_map_log.assert_called_once_with(
_svn().log.return_value[0], 'file:///tmp/svn/p/test/test2', None)
class TestRepoObject(_TestWithRepoAndCommit):
def test_upsert(self):
obj0, isnew0 = M.repository.Tree.upsert('foo1')
obj1, isnew1 = M.repository.Tree.upsert('foo1')
assert obj0 is obj1
assert isnew0 and not isnew1
def test_artifact_methods(self):
assert self.ci.index_id(
) == 'allura/model/repo/Commit#foo', self.ci.index_id()
assert self.ci.primary() is self.ci, self.ci.primary()
class TestCommit(_TestWithRepo):
def setUp(self):
super(TestCommit, self).setUp()
self.ci, isnew = self._make_commit(
'foo',
a=dict(
a=dict(
a='',
b='',),
b=''))
self.tree = self.ci.tree
impl = M.RepositoryImplementation()
impl._repo = self.repo
self.repo._impl.shorthand_for_commit = impl.shorthand_for_commit
self.repo._impl.url_for_commit = impl.url_for_commit
def test_upsert(self):
obj0, isnew0 = M.repository.Commit.upsert('foo')
obj1, isnew1 = M.repository.Commit.upsert('foo')
assert obj0 is obj1
assert not isnew1
u = M.User.by_username('test-admin')
assert self.ci.author_url == u.url()
assert self.ci.committer_url == u.url()
assert self.ci.tree is self.tree
assert self.ci.summary == 'summary'
assert self.ci.shorthand_id() == '[foo]'
assert self.ci.url() == '/p/test/test1/ci/foo/'
def test_get_path(self):
b = self.ci.get_path('a/a/a')
assert isinstance(b, M.repository.Blob)
x = self.ci.get_path('a/a')
assert isinstance(x, M.repository.Tree)
def _unique_blobs(self):
def counter():
counter.i += 1
return counter.i
counter.i = 0
blobs = defaultdict(counter)
from cStringIO import StringIO
return lambda blob: StringIO(str(blobs[blob.path()]))
def test_diffs_file_renames(self):
def open_blob(blob):
blobs = {
u'a': u'Leia',
u'/b/a/a': u'Darth Vader',
u'/b/a/b': u'Luke Skywalker',
u'/b/b': u'Death Star will destroy you',
u'/b/c': u'Luke Skywalker', # moved from /b/a/b
# moved from /b/b and modified
u'/b/a/z': u'Death Star will destroy you\nALL',
}
from cStringIO import StringIO
return StringIO(blobs.get(blob.path(), ''))
self.repo._impl.open_blob = open_blob
self.repo._impl.commit = mock.Mock(return_value=self.ci)
self.repo._impl.paged_diffs.return_value = {
'added': ['a', 'a/a', 'a/a/a', 'a/a/b', 'a/b'],
'changed': [],
'removed': [],
'total': 5,
}
M.repo_refresh.refresh_commit_trees(self.ci, {})
assert_equal(self.ci.diffs.added,
['a', 'a/a', 'a/a/a', 'a/a/b', 'a/b'])
assert (self.ci.diffs.copied
== self.ci.diffs.changed
== self.ci.diffs.removed
== [])
ci, isnew = self._make_commit(
'bar',
b=dict(
a=dict(
a='',
b='',),
b=''))
ci.parent_ids = ['foo']
self._make_log(ci)
self.repo._impl.paged_diffs.return_value = {
'added': ['b', 'b/a', 'b/a/a', 'b/a/b', 'b/b'],
'changed': [],
'removed': ['a', 'a/a', 'a/a/a', 'a/a/b', 'a/b'],
'total': 10,
}
M.repo_refresh.refresh_commit_trees(ci, {})
assert_equal(ci.diffs.added, ['b', 'b/a', 'b/a/a', 'b/a/b', 'b/b'])
assert_equal(ci.diffs.removed, ['a', 'a/a', 'a/a/a', 'a/a/b', 'a/b'])
assert (ci.diffs.copied
== ci.diffs.changed
== [])
ci, isnew = self._make_commit(
'baz',
b=dict(
a=dict(
z=''),
c=''))
ci.parent_ids = ['bar']
self._make_log(ci)
self.repo._impl.paged_diffs.return_value = {
'added': ['b/c', 'b/a/z'],
'changed': [],
'removed': ['b/a/b', 'b/b', 'b/a/a'],
'total': 10,
}
M.repo_refresh.refresh_commit_trees(ci, {})
assert_equal(ci.diffs.added, [])
assert_equal(ci.diffs.changed, [])
assert_equal(ci.diffs.removed, ['b/a/a'])
# see mock for open_blob
assert_equal(len(ci.diffs.copied), 2)
assert_equal(ci.diffs.copied[0]['old'], 'b/a/b')
assert_equal(ci.diffs.copied[0]['new'], 'b/c')
assert_equal(ci.diffs.copied[0]['ratio'], 1)
assert_equal(ci.diffs.copied[0]['diff'], '')
assert_equal(ci.diffs.copied[1]['old'], 'b/b')
assert_equal(ci.diffs.copied[1]['new'], 'b/a/z')
assert ci.diffs.copied[1]['ratio'] < 1, ci.diffs.copied[1]['ratio']
assert '+++' in ci.diffs.copied[1]['diff'], ci.diffs.copied[1]['diff']
def test_context(self):
self.ci.context()
class TestRename(unittest.TestCase):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@with_svn
def setup_with_tools(self):
setup_global_objects()
h.set_context('test', 'src', neighborhood='Projects')
repo_dir = pkg_resources.resource_filename(
'forgesvn', 'tests/data/')
c.app.repo.name = 'testsvn-rename'
c.app.repo.fs_path = repo_dir
self.repo = c.app.repo
self.repo.refresh()
self.rev = self.repo.commit('HEAD')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def test_log_file_with_rename(self):
entry = list(self.repo.log(path='/dir/b.txt', id_only=False))[0]
assert_equal(entry['id'], 3)
assert_equal(entry['rename_details']['path'], '/dir/a.txt')
assert_equal(
entry['rename_details']['commit_url'],
self.repo.url_for_commit(2) # previous revision
)
def test_check_changed_path(self):
changed_path = {'copyfrom_path': '/test/path', 'path': '/test/path2'}
result = self.repo._impl._check_changed_path(
changed_path, '/test/path2/file.txt')
assert_equal({'path': '/test/path2/file.txt',
'copyfrom_path': '/test/path/file.txt'}, result)
class TestDirectRepoAccess(object):
def setUp(self):
setup_basic_test()
self.setup_with_tools()
@with_svn
def setup_with_tools(self):
setup_global_objects()
h.set_context('test', 'src', neighborhood='Projects')
repo_dir = pkg_resources.resource_filename(
'forgesvn', 'tests/data/')
c.app.repo.name = 'testsvn'
c.app.repo.fs_path = repo_dir
self.repo = c.app.repo
self.repo.refresh()
self.rev = self.repo.commit('HEAD')
ThreadLocalORMSession.flush_all()
ThreadLocalORMSession.close_all()
def test_paged_diffs(self):
diffs = self.rev.diffs
expected = {
'added': [u'/ЗРЯЧИЙ_ТА_ПОБАЧИТЬ'],
'removed': [],
'changed': [],
'copied': [],
'total': 1,
}
assert_equals(diffs, expected)
_id = self.repo._impl._oid(2)
diffs = self.repo.commit(_id).diffs
expected = {
'added': [u'/a', u'/a/b', u'/a/b/c', u'/a/b/c/hello.txt'],
'removed': [],
'changed': [],
'copied': [],
'total': 4,
}
assert_equals(diffs, expected)
_id = self.repo._impl._oid(3)
diffs = self.repo.commit(_id).diffs
expected = {
'added': [],
'removed': [],
'changed': [u'/README'],
'copied': [],
'total': 1,
}
assert_equals(diffs, expected)
_id = self.repo._impl._oid(4)
diffs = self.repo.commit(_id).diffs
expected = {
'added': [],
'removed': ['/a/b/c/hello.txt'],
'changed': [],
'copied': [],
'total': 1,
}
assert_equals(diffs, expected)