blob: bf594ce0b33a63578d9c0ffb6d91888c12a60145 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright (C) 2014 Edgewall Software
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at http://trac.edgewall.org/log/.
import os
import tempfile
import unittest
from datetime import datetime, timedelta
from subprocess import Popen, PIPE
from trac.core import TracError
from trac.test import EnvironmentStub, Mock, MockPerm, locate
from trac.tests.compat import rmtree
from trac.util import create_file
from trac.util.compat import close_fds
from trac.util.datefmt import to_timestamp, utc
from trac.versioncontrol.api import Changeset, DbRepositoryProvider, \
NoSuchChangeset, NoSuchNode, \
RepositoryManager
from trac.versioncontrol.web_ui.browser import BrowserModule
from trac.versioncontrol.web_ui.log import LogModule
from trac.web.href import Href
from tracopt.versioncontrol.git.PyGIT import StorageFactory
from tracopt.versioncontrol.git.git_fs import GitCachedRepository, \
GitConnector, GitRepository
git_bin = None
class GitCommandMixin(object):
def _git_commit(self, *args, **kwargs):
env = kwargs.get('env') or os.environ.copy()
if 'date' in kwargs:
self._set_committer_date(env, kwargs.pop('date'))
args = ('commit',) + args
kwargs['env'] = env
return self._git(*args, **kwargs)
def _git(self, *args, **kwargs):
args = (git_bin,) + args
proc = Popen(args, stdout=PIPE, stderr=PIPE, close_fds=close_fds,
cwd=self.repos_path, **kwargs)
stdout, stderr = proc.communicate()
self.assertEqual(0, proc.returncode,
'git exits with %r, args %r, stdout %r, stderr %r' %
(proc.returncode, args, stdout, stderr))
return proc
def _git_date_format(self, dt):
if dt.tzinfo is None:
dt = dt.replace(tzinfo=utc)
offset = dt.utcoffset()
secs = offset.days * 3600 * 24 + offset.seconds
hours, rem = divmod(abs(secs), 3600)
return '%d %c%02d:%02d' % (to_timestamp(dt), '-' if secs < 0 else '+',
hours, rem / 60)
def _set_committer_date(self, env, dt):
if not isinstance(dt, basestring):
if dt.tzinfo is None:
dt = dt.replace(tzinfo=utc)
dt = self._git_date_format(dt)
env['GIT_COMMITTER_DATE'] = dt
env['GIT_AUTHOR_DATE'] = dt
class BaseTestCase(unittest.TestCase, GitCommandMixin):
def setUp(self):
self.env = EnvironmentStub()
self.repos_path = tempfile.mkdtemp(prefix='trac-gitrepos-')
if git_bin:
self.env.config.set('git', 'git_bin', git_bin)
def tearDown(self):
self._repomgr.reload_repositories()
StorageFactory._clean()
self.env.reset_db()
if os.path.isdir(self.repos_path):
rmtree(self.repos_path)
@property
def _repomgr(self):
return RepositoryManager(self.env)
@property
def _dbrepoprov(self):
return DbRepositoryProvider(self.env)
def _add_repository(self, reponame='gitrepos', bare=False):
path = self.repos_path \
if bare else os.path.join(self.repos_path, '.git')
self._dbrepoprov.add_repository(reponame, path, 'git')
def _git_init(self, data=True, bare=False):
if bare:
self._git('init', '--bare')
else:
self._git('init')
if not bare and data:
self._git('config', 'user.name', 'Joe')
self._git('config', 'user.email', 'joe@example.com')
create_file(os.path.join(self.repos_path, '.gitignore'))
self._git('add', '.gitignore')
self._git_commit('-a', '-m', 'test',
date=datetime(2001, 1, 29, 16, 39, 56))
class SanityCheckingTestCase(BaseTestCase):
def test_bare(self):
self._git_init(bare=True)
self._dbrepoprov.add_repository('gitrepos', self.repos_path, 'git')
self._repomgr.get_repository('gitrepos')
def test_non_bare(self):
self._git_init(bare=False)
self._dbrepoprov.add_repository('gitrepos.1',
os.path.join(self.repos_path, '.git'),
'git')
self._repomgr.get_repository('gitrepos.1')
self._dbrepoprov.add_repository('gitrepos.2', self.repos_path, 'git')
self._repomgr.get_repository('gitrepos.2')
def test_no_head_file(self):
self._git_init(bare=True)
os.unlink(os.path.join(self.repos_path, 'HEAD'))
self._dbrepoprov.add_repository('gitrepos', self.repos_path, 'git')
self.assertRaises(TracError, self._repomgr.get_repository, 'gitrepos')
def test_no_objects_dir(self):
self._git_init(bare=True)
rmtree(os.path.join(self.repos_path, 'objects'))
self._dbrepoprov.add_repository('gitrepos', self.repos_path, 'git')
self.assertRaises(TracError, self._repomgr.get_repository, 'gitrepos')
def test_no_refs_dir(self):
self._git_init(bare=True)
rmtree(os.path.join(self.repos_path, 'refs'))
self._dbrepoprov.add_repository('gitrepos', self.repos_path, 'git')
self.assertRaises(TracError, self._repomgr.get_repository, 'gitrepos')
class PersistentCacheTestCase(BaseTestCase):
def test_persistent(self):
self.env.config.set('git', 'persistent_cache', 'enabled')
self._git_init()
self._add_repository()
youngest = self._repository.youngest_rev
self._repomgr.reload_repositories() # clear repository cache
self._commit(datetime(2014, 1, 29, 16, 44, 54, 0, utc))
self.assertEqual(youngest, self._repository.youngest_rev)
self._repository.sync()
self.assertNotEqual(youngest, self._repository.youngest_rev)
def test_non_persistent(self):
self.env.config.set('git', 'persistent_cache', 'disabled')
self._git_init()
self._add_repository()
youngest = self._repository.youngest_rev
self._repomgr.reload_repositories() # clear repository cache
self._commit(datetime(2014, 1, 29, 16, 44, 54, 0, utc))
youngest_2 = self._repository.youngest_rev
self.assertNotEqual(youngest, youngest_2)
self._repository.sync()
self.assertNotEqual(youngest, self._repository.youngest_rev)
self.assertEqual(youngest_2, self._repository.youngest_rev)
def _commit(self, date):
gitignore = os.path.join(self.repos_path, '.gitignore')
create_file(gitignore, date.isoformat())
self._git_commit('-a', '-m', date.isoformat(), date=date)
@property
def _repository(self):
return self._repomgr.get_repository('gitrepos')
class HistoryTimeRangeTestCase(BaseTestCase):
def test_without_cache(self):
self._test_timerange('disabled')
def test_with_cache(self):
self._test_timerange('enabled')
def _test_timerange(self, cached_repository):
self.env.config.set('git', 'cached_repository', cached_repository)
self._git_init()
filename = os.path.join(self.repos_path, '.gitignore')
start = datetime(2000, 1, 1, 0, 0, 0, 0, utc)
ts = datetime(2014, 2, 5, 15, 24, 6, 0, utc)
for idx in xrange(3):
create_file(filename, 'commit-%d.txt' % idx)
self._git_commit('-a', '-m', 'commit %d' % idx, date=ts)
self._add_repository()
repos = self._repomgr.get_repository('gitrepos')
repos.sync()
revs = [repos.youngest_rev]
while True:
parents = repos.parent_revs(revs[-1])
if not parents:
break
revs.extend(parents)
self.assertEqual(4, len(revs))
csets = list(repos.get_changesets(start, ts))
self.assertEqual(1, len(csets))
self.assertEqual(revs[-1], csets[0].rev) # is oldest rev
csets = list(repos.get_changesets(start, ts + timedelta(seconds=1)))
self.assertEqual(revs, [cset.rev for cset in csets])
class GitNormalTestCase(BaseTestCase):
def _create_req(self, **kwargs):
data = dict(args={}, perm=MockPerm(), href=Href('/'), chrome={},
authname='trac', tz=utc, get_header=lambda name: None)
data.update(kwargs)
return Mock(**data)
def test_get_node(self):
self.env.config.set('git', 'persistent_cache', 'false')
self.env.config.set('git', 'cached_repository', 'false')
self._git_init()
self._add_repository()
repos = self._repomgr.get_repository('gitrepos')
rev = repos.youngest_rev
self.assertNotEqual(None, rev)
self.assertEqual(40, len(rev))
self.assertEqual(rev, repos.get_node('/').rev)
self.assertEqual(rev, repos.get_node('/', rev[:7]).rev)
self.assertEqual(rev, repos.get_node('/.gitignore').rev)
self.assertEqual(rev, repos.get_node('/.gitignore', rev[:7]).rev)
self.assertRaises(NoSuchNode, repos.get_node, '/non-existent')
self.assertRaises(NoSuchNode, repos.get_node, '/non-existent', rev[:7])
self.assertRaises(NoSuchNode, repos.get_node, '/non-existent', rev)
self.assertRaises(NoSuchChangeset,
repos.get_node, '/', 'invalid-revision')
self.assertRaises(NoSuchChangeset,
repos.get_node, '/.gitignore', 'invalid-revision')
self.assertRaises(NoSuchChangeset,
repos.get_node, '/non-existent', 'invalid-revision')
# git_fs doesn't support non-ANSI strings on Windows
if os.name != 'nt':
self._git('branch', u'tïckét10605', 'master')
repos.sync()
self.assertEqual(rev, repos.get_node('/', u'tïckét10605').rev)
self.assertEqual(rev, repos.get_node('/.gitignore',
u'tïckét10605').rev)
def _test_on_empty_repos(self, cached_repository):
self.env.config.set('git', 'persistent_cache', 'false')
self.env.config.set('git', 'cached_repository', cached_repository)
self._git_init(data=False, bare=True)
self._add_repository(bare=True)
repos = self._repomgr.get_repository('gitrepos')
repos.sync()
youngest_rev = repos.youngest_rev
self.assertEqual(None, youngest_rev)
self.assertEqual(None, repos.oldest_rev)
self.assertEqual(None, repos.normalize_rev(''))
self.assertEqual(None, repos.normalize_rev(None))
node = repos.get_node('/', youngest_rev)
self.assertEqual([], list(node.get_entries()))
self.assertEqual([], list(node.get_history()))
self.assertRaises(NoSuchNode, repos.get_node, '/path', youngest_rev)
req = self._create_req(path_info='/browser/gitrepos')
browser_mod = BrowserModule(self.env)
self.assertTrue(browser_mod.match_request(req))
rv = browser_mod.process_request(req)
self.assertEqual('browser.html', rv[0])
self.assertEqual(None, rv[1]['rev'])
req = self._create_req(path_info='/log/gitrepos')
log_mod = LogModule(self.env)
self.assertTrue(log_mod.match_request(req))
rv = log_mod.process_request(req)
self.assertEqual('revisionlog.html', rv[0])
self.assertEqual([], rv[1]['items'])
def test_on_empty_and_cached_repos(self):
self._test_on_empty_repos('true')
def test_on_empty_and_non_cached_repos(self):
self._test_on_empty_repos('false')
class GitRepositoryTestCase(BaseTestCase):
cached_repository = 'disabled'
def setUp(self):
BaseTestCase.setUp(self)
self.env.config.set('git', 'cached_repository', self.cached_repository)
def _create_merge_commit(self):
for idx, branch in enumerate(('alpha', 'beta')):
self._git('checkout', '-b', branch, 'master')
for n in xrange(2):
filename = 'file-%s-%d.txt' % (branch, n)
create_file(os.path.join(self.repos_path, filename))
self._git('add', filename)
self._git_commit('-a', '-m', filename,
date=datetime(2014, 2, 2, 17, 12,
n * 2 + idx))
self._git('checkout', 'alpha')
self._git('merge', '-m', 'Merge branch "beta" to "alpha"', 'beta')
def test_repository_instance(self):
self._git_init()
self._add_repository('gitrepos')
self.assertEqual(GitRepository,
type(self._repomgr.get_repository('gitrepos')))
def test_reset_head(self):
self._git_init()
create_file(os.path.join(self.repos_path, 'file.txt'), 'text')
self._git('add', 'file.txt')
self._git_commit('-a', '-m', 'test',
date=datetime(2014, 2, 2, 17, 12, 18))
self._add_repository('gitrepos')
repos = self._repomgr.get_repository('gitrepos')
repos.sync()
youngest_rev = repos.youngest_rev
entries = list(repos.get_node('').get_history())
self.assertEqual(2, len(entries))
self.assertEqual('', entries[0][0])
self.assertEqual(Changeset.EDIT, entries[0][2])
self.assertEqual('', entries[1][0])
self.assertEqual(Changeset.ADD, entries[1][2])
self._git('reset', '--hard', 'HEAD~')
repos.sync()
new_entries = list(repos.get_node('').get_history())
self.assertEqual(1, len(new_entries))
self.assertEqual(new_entries[0], entries[1])
self.assertNotEqual(youngest_rev, repos.youngest_rev)
def test_tags(self):
self._git_init()
self._add_repository('gitrepos')
repos = self._repomgr.get_repository('gitrepos')
repos.sync()
self.assertEqual(['master'], self._get_quickjump_names(repos))
self._git('tag', 'v1.0', 'master') # add tag
repos.sync()
self.assertEqual(['master', 'v1.0'], self._get_quickjump_names(repos))
self._git('tag', '-d', 'v1.0') # delete tag
repos.sync()
self.assertEqual(['master'], self._get_quickjump_names(repos))
def test_branchs(self):
self._git_init()
self._add_repository('gitrepos')
repos = self._repomgr.get_repository('gitrepos')
repos.sync()
self.assertEqual(['master'], self._get_quickjump_names(repos))
self._git('branch', 'alpha', 'master') # add branch
repos.sync()
self.assertEqual(['alpha', 'master'], self._get_quickjump_names(repos))
self._git('branch', '-m', 'alpha', 'beta') # rename branch
repos.sync()
self.assertEqual(['beta', 'master'], self._get_quickjump_names(repos))
self._git('branch', '-D', 'beta') # delete branch
repos.sync()
self.assertEqual(['master'], self._get_quickjump_names(repos))
def test_parent_child_revs(self):
self._git_init()
self._git('branch', 'initial')
self._create_merge_commit()
self._git('branch', 'latest')
self._add_repository('gitrepos')
repos = self._repomgr.get_repository('gitrepos')
repos.sync()
rev = repos.normalize_rev('initial')
children = repos.child_revs(rev)
self.assertEqual(2, len(children), 'child_revs: %r' % children)
parents = repos.parent_revs(rev)
self.assertEqual(0, len(parents), 'parent_revs: %r' % parents)
self.assertEqual(1, len(repos.child_revs(children[0])))
self.assertEqual(1, len(repos.child_revs(children[1])))
rev = repos.normalize_rev('latest')
children = repos.child_revs(rev)
self.assertEqual(0, len(children), 'child_revs: %r' % children)
parents = repos.parent_revs(rev)
self.assertEqual(2, len(parents), 'parent_revs: %r' % parents)
self.assertEqual(1, len(repos.parent_revs(parents[0])))
self.assertEqual(1, len(repos.parent_revs(parents[1])))
def _get_quickjump_names(self, repos):
return sorted(name for type, name, path, rev
in repos.get_quickjump_entries('HEAD'))
class GitCachedRepositoryTestCase(GitRepositoryTestCase):
cached_repository = 'enabled'
def test_repository_instance(self):
self._git_init()
self._add_repository('gitrepos')
self.assertEqual(GitCachedRepository,
type(self._repomgr.get_repository('gitrepos')))
def test_sync(self):
self._git_init()
for idx in xrange(3):
filename = 'file%d.txt' % idx
create_file(os.path.join(self.repos_path, filename))
self._git('add', filename)
self._git_commit('-a', '-m', filename,
date=datetime(2014, 2, 2, 17, 12, idx))
self._add_repository('gitrepos')
repos = self._repomgr.get_repository('gitrepos')
revs = [entry[1] for entry in repos.repos.get_node('').get_history()]
revs.reverse()
revs2 = []
def feedback(rev):
revs2.append(rev)
repos.sync(feedback=feedback)
self.assertEqual(revs, revs2)
self.assertEqual(4, len(revs2))
revs2 = []
def feedback_1(rev):
revs2.append(rev)
if len(revs2) == 2:
raise StopSync
def feedback_2(rev):
revs2.append(rev)
try:
repos.sync(feedback=feedback_1, clean=True)
except StopSync:
self.assertEqual(revs[:2], revs2)
repos.sync(feedback=feedback_2) # restart sync
self.assertEqual(revs, revs2)
def test_sync_merge(self):
self._git_init()
self._create_merge_commit()
self._add_repository('gitrepos')
repos = self._repomgr.get_repository('gitrepos')
youngest_rev = repos.repos.youngest_rev
oldest_rev = repos.repos.oldest_rev
revs = []
def feedback(rev):
revs.append(rev)
repos.sync(feedback=feedback)
self.assertEqual(6, len(revs))
self.assertEqual(youngest_rev, revs[-1])
self.assertEqual(oldest_rev, revs[0])
revs2 = []
def feedback_1(rev):
revs2.append(rev)
if len(revs2) == 3:
raise StopSync
def feedback_2(rev):
revs2.append(rev)
try:
repos.sync(feedback=feedback_1, clean=True)
except StopSync:
self.assertEqual(revs[:3], revs2)
repos.sync(feedback=feedback_2) # restart sync
self.assertEqual(revs, revs2)
class StopSync(Exception):
pass
def suite():
global git_bin
suite = unittest.TestSuite()
git_bin = locate('git')
if git_bin:
suite.addTest(unittest.makeSuite(SanityCheckingTestCase))
suite.addTest(unittest.makeSuite(PersistentCacheTestCase))
suite.addTest(unittest.makeSuite(HistoryTimeRangeTestCase))
suite.addTest(unittest.makeSuite(GitNormalTestCase))
suite.addTest(unittest.makeSuite(GitRepositoryTestCase))
suite.addTest(unittest.makeSuite(GitCachedRepositoryTestCase))
else:
print("SKIP: tracopt/versioncontrol/git/tests/git_fs.py (git cli "
"binary, 'git', not found)")
return suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')