| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import datetime |
| import unittest |
| from mock import patch, Mock, MagicMock |
| from nose.tools import assert_equal |
| from datadiff import tools as dd |
| |
| from pylons import tmpl_context as c |
| |
| from allura import model as M |
| from allura.controllers.repository import topo_sort |
| from allura.model.repository import zipdir, prefix_paths_union |
| from allura.model.repo_refresh import ( |
| CommitRunDoc, |
| CommitRunBuilder, |
| _group_commits, |
| ) |
| from alluratest.controller import setup_unit_test |
| |
| |
| class TestCommitRunBuilder(unittest.TestCase): |
| |
| def setUp(self): |
| setup_unit_test() |
| commits = [ |
| M.repository.CommitDoc.make(dict( |
| _id=str(i))) |
| for i in range(10)] |
| for p, c in zip(commits, commits[1:]): |
| p.child_ids = [c._id] |
| c.parent_ids = [p._id] |
| for ci in commits: |
| ci.m.save() |
| self.commits = commits |
| |
| def test_single_pass(self): |
| crb = CommitRunBuilder( |
| [ci._id for ci in self.commits]) |
| crb.run() |
| self.assertEqual(CommitRunDoc.m.count(), 1) |
| |
| def test_two_pass(self): |
| crb = CommitRunBuilder( |
| [ci._id for ci in self.commits[:5]]) |
| crb.run() |
| crb = CommitRunBuilder( |
| [ci._id for ci in self.commits[5:]]) |
| crb.run() |
| self.assertEqual(CommitRunDoc.m.count(), 2) |
| crb.cleanup() |
| self.assertEqual(CommitRunDoc.m.count(), 1) |
| |
| def test_svn_like(self): |
| for ci in self.commits: |
| crb = CommitRunBuilder([ci._id]) |
| crb.run() |
| crb.cleanup() |
| self.assertEqual(CommitRunDoc.m.count(), 1) |
| |
| def test_reversed(self): |
| for ci in reversed(self.commits): |
| crb = CommitRunBuilder([ci._id]) |
| crb.run() |
| crb.cleanup() |
| self.assertEqual(CommitRunDoc.m.count(), 1) |
| |
| |
| class TestTopoSort(unittest.TestCase): |
| |
| def test_commit_dates_out_of_order(self): |
| """Commits should be sorted by their parent/child relationships, |
| regardless of the date on the commit. |
| """ |
| head_ids = ['dev', 'master'] |
| parents = { |
| 'dev': ['dev@{1}'], |
| 'dev@{1}': ['master'], |
| 'master': ['master@{1}'], |
| 'master@{1}': ['master@{2}'], |
| 'master@{2}': ['master@{3}'], |
| 'master@{3}': []} |
| children = { |
| 'master@{3}': ['master@{2}'], |
| 'master@{2}': ['master@{1}'], |
| 'master@{1}': ['master'], |
| 'master': ['dev@{1}'], |
| 'dev@{1}': ['dev'], |
| 'dev': []} |
| dates = { |
| 'dev@{1}': datetime.datetime(2012, 1, 1), |
| 'master@{3}': datetime.datetime(2012, 2, 1), |
| 'master@{2}': datetime.datetime(2012, 3, 1), |
| 'master@{1}': datetime.datetime(2012, 4, 1), |
| 'master': datetime.datetime(2012, 5, 1), |
| 'dev': datetime.datetime(2012, 6, 1)} |
| result = topo_sort(children, parents, dates, head_ids) |
| self.assertEqual(list(result), ['dev', 'dev@{1}', 'master', |
| 'master@{1}', 'master@{2}', 'master@{3}']) |
| |
| |
| def tree(name, id, trees=None, blobs=None): |
| t = Mock(tree_ids=[], blob_ids=[], other_ids=[]) |
| t.name = name |
| t.id = id |
| t._id = id |
| if trees is not None: |
| t.tree_ids = trees |
| if blobs is not None: |
| t.blob_ids = blobs |
| return t |
| |
| |
| def blob(name, id): |
| b = Mock() |
| b.name = name |
| b.id = id |
| return b |
| |
| |
| class TestTree(unittest.TestCase): |
| |
| @patch('allura.model.repository.Tree.__getitem__') |
| def test_get_obj_by_path(self, getitem): |
| tree = M.repository.Tree() |
| # test with relative path |
| tree.get_obj_by_path('some/path/file.txt') |
| getitem.assert_called_with('some') |
| getitem().__getitem__.assert_called_with('path') |
| getitem().__getitem__().__getitem__.assert_called_with('file.txt') |
| # test with absolute path |
| tree.get_obj_by_path('/some/path/file.txt') |
| getitem.assert_called_with('some') |
| getitem().__getitem__.assert_called_with('path') |
| getitem().__getitem__().__getitem__.assert_called_with('file.txt') |
| |
| |
| class TestBlob(unittest.TestCase): |
| |
| def test_pypeline_view(self): |
| blob = M.repository.Blob(Mock(), Mock(), Mock()) |
| blob._id = 'blob1' |
| blob.path = Mock(return_value='path') |
| blob.name = 'INSTALL.mdown' |
| blob.extension = '.mdown' |
| assert_equal(blob.has_pypeline_view, True) |
| |
| |
| class TestCommit(unittest.TestCase): |
| |
| def test_activity_extras(self): |
| commit = M.repository.Commit() |
| commit.shorthand_id = MagicMock(return_value='abcdef') |
| commit.message = 'commit msg' |
| self.assertIn('allura_id', commit.activity_extras) |
| self.assertEqual(commit.activity_extras['summary'], commit.summary) |
| |
| def test_get_path_no_create(self): |
| commit = M.repository.Commit() |
| commit.get_tree = MagicMock() |
| commit.get_path('foo/', create=False) |
| commit.get_tree.assert_called_with(False) |
| commit.get_tree().__getitem__.assert_called_with('foo') |
| commit.get_tree().__getitem__.assert_not_called_with('') |
| |
| def test_get_tree_no_create(self): |
| c.model_cache = Mock() |
| c.model_cache.get.return_value = None |
| commit = M.repository.Commit() |
| commit.repo = Mock() |
| |
| commit.tree_id = None |
| tree = commit.get_tree(create=False) |
| assert not commit.repo.compute_tree_new.called |
| assert not c.model_cache.get.called |
| assert_equal(tree, None) |
| |
| commit.tree_id = 'tree' |
| tree = commit.get_tree(create=False) |
| assert not commit.repo.compute_tree_new.called |
| c.model_cache.get.assert_called_with(M.repository.Tree, dict(_id='tree')) |
| assert_equal(tree, None) |
| |
| _tree = Mock() |
| c.model_cache.get.return_value = _tree |
| tree = commit.get_tree(create=False) |
| _tree.set_context.assert_called_with(commit) |
| assert_equal(tree, _tree) |
| |
| @patch.object(M.repository.Tree.query, 'get') |
| def test_get_tree_create(self, tree_get): |
| c.model_cache = Mock() |
| c.model_cache.get.return_value = None |
| commit = M.repository.Commit() |
| commit.repo = Mock() |
| |
| commit.repo.compute_tree_new.return_value = None |
| commit.tree_id = None |
| tree = commit.get_tree() |
| commit.repo.compute_tree_new.assert_called_once_with(commit) |
| assert not c.model_cache.get.called |
| assert not tree_get.called |
| assert_equal(tree, None) |
| |
| commit.repo.compute_tree_new.reset_mock() |
| commit.repo.compute_tree_new.return_value = 'tree' |
| _tree = Mock() |
| c.model_cache.get.return_value = _tree |
| tree = commit.get_tree() |
| commit.repo.compute_tree_new.assert_called_once_with(commit) |
| assert not tree_get.called |
| c.model_cache.get.assert_called_once_with( |
| M.repository.Tree, dict(_id='tree')) |
| _tree.set_context.assert_called_once_with(commit) |
| assert_equal(tree, _tree) |
| |
| commit.repo.compute_tree_new.reset_mock() |
| c.model_cache.get.reset_mock() |
| commit.tree_id = 'tree2' |
| tree = commit.get_tree() |
| assert not commit.repo.compute_tree_new.called |
| assert not tree_get.called |
| c.model_cache.get.assert_called_once_with( |
| M.repository.Tree, dict(_id='tree2')) |
| _tree.set_context.assert_called_once_with(commit) |
| assert_equal(tree, _tree) |
| |
| commit.repo.compute_tree_new.reset_mock() |
| c.model_cache.get.reset_mock() |
| c.model_cache.get.return_value = None |
| tree_get.return_value = _tree |
| tree = commit.get_tree() |
| c.model_cache.get.assert_called_once_with( |
| M.repository.Tree, dict(_id='tree2')) |
| commit.repo.compute_tree_new.assert_called_once_with(commit) |
| assert_equal(commit.tree_id, 'tree') |
| tree_get.assert_called_once_with(_id='tree') |
| c.model_cache.set.assert_called_once_with( |
| M.repository.Tree, dict(_id='tree'), _tree) |
| _tree.set_context.assert_called_once_with(commit) |
| assert_equal(tree, _tree) |
| |
| def test_tree_create(self): |
| commit = M.repository.Commit() |
| commit.get_tree = Mock() |
| tree = commit.tree |
| commit.get_tree.assert_called_with(create=True) |
| |
| |
| class TestZipDir(unittest.TestCase): |
| |
| @patch('allura.model.repository.Popen') |
| @patch('allura.model.repository.tg') |
| def test_popen_called(self, tg, popen): |
| from subprocess import PIPE |
| popen.return_value.communicate.return_value = 1, 2 |
| popen.return_value.returncode = 0 |
| tg.config = {'scm.repos.tarball.zip_binary': '/bin/zip'} |
| src = '/fake/path/to/repo' |
| zipfile = '/fake/zip/file.tmp' |
| zipdir(src, zipfile) |
| popen.assert_called_once_with( |
| ['/bin/zip', '-y', '-q', '-r', zipfile, 'repo'], |
| cwd='/fake/path/to', stdout=PIPE, stderr=PIPE) |
| popen.reset_mock() |
| src = '/fake/path/to/repo/' |
| zipdir(src, zipfile, exclude='file.txt') |
| popen.assert_called_once_with( |
| ['/bin/zip', '-y', '-q', '-r', |
| zipfile, 'repo', '-x', 'file.txt'], |
| cwd='/fake/path/to', stdout=PIPE, stderr=PIPE) |
| |
| @patch('allura.model.repository.Popen') |
| @patch('allura.model.repository.tg') |
| def test_exception_logged(self, tg, popen): |
| tg.config = {'scm.repos.tarball.zip_binary': '/bin/zip'} |
| popen.return_value.communicate.return_value = 1, 2 |
| popen.return_value.returncode = 1 |
| src = '/fake/path/to/repo' |
| zipfile = '/fake/zip/file.tmp' |
| with self.assertRaises(Exception) as cm: |
| zipdir(src, zipfile) |
| emsg = str(cm.exception) |
| self.assertTrue( |
| "Command: " |
| "['/bin/zip', '-y', '-q', '-r', '/fake/zip/file.tmp', 'repo'] " |
| "returned non-zero exit code 1" in emsg) |
| self.assertTrue("STDOUT: 1" in emsg) |
| self.assertTrue("STDERR: 2" in emsg) |
| |
| |
| class TestPrefixPathsUnion(unittest.TestCase): |
| |
| def test_disjoint(self): |
| a = set(['a1', 'a2', 'a3']) |
| b = set(['b1', 'b1/foo', 'b2']) |
| self.assertItemsEqual(prefix_paths_union(a, b), []) |
| |
| def test_exact(self): |
| a = set(['a1', 'a2', 'a3']) |
| b = set(['b1', 'a2', 'a3']) |
| self.assertItemsEqual(prefix_paths_union(a, b), ['a2', 'a3']) |
| |
| def test_prefix(self): |
| a = set(['a1', 'a2', 'a3']) |
| b = set(['b1', 'a2/foo', 'b3/foo']) |
| self.assertItemsEqual(prefix_paths_union(a, b), ['a2']) |
| |
| |
| class TestGroupCommits(object): |
| |
| def setUp(self): |
| self.repo = Mock() |
| self.repo.symbolics_for_commit.return_value = ([], []) |
| |
| def test_no_branches(self): |
| b, t = _group_commits(self.repo, ['3', '2', '1']) |
| dd.assert_equal(b, {'__default__': ['3', '2', '1']}) |
| dd.assert_equal(t, {}) |
| |
| def test_branches_and_tags(self): |
| self.repo.symbolics_for_commit.side_effect = [ |
| (['master'], ['v1.1']), |
| ([], []), |
| ([], []), |
| ] |
| b, t = _group_commits(self.repo, ['3', '2', '1']) |
| dd.assert_equal(b, {'master': ['3', '2', '1']}) |
| dd.assert_equal(t, {'v1.1': ['3', '2', '1']}) |
| |
| def test_multiple_branches(self): |
| self.repo.symbolics_for_commit.side_effect = [ |
| (['master'], ['v1.1']), |
| ([], ['v1.0']), |
| (['test1', 'test2'], []), |
| ] |
| b, t = _group_commits(self.repo, ['3', '2', '1']) |
| dd.assert_equal(b, {'master': ['3', '2'], |
| 'test1': ['1'], |
| 'test2': ['1']}) |
| dd.assert_equal(t, {'v1.1': ['3'], |
| 'v1.0': ['2', '1']}) |