#       Licensed to the Apache Software Foundation (ASF) under one
#       or more contributor license agreements.  See the NOTICE file
#       distributed with this work for additional information
#       regarding copyright ownership.  The ASF licenses this file
#       to you under the Apache License, Version 2.0 (the
#       "License"); you may not use this file except in compliance
#       with the License.  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#       Unless required by applicable law or agreed to in writing,
#       software distributed under the License is distributed on an
#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#       KIND, either express or implied.  See the License for the
#       specific language governing permissions and limitations
#       under the License.

import os
import shutil
import unittest
import pkg_resources
from itertools import count, product
from datetime import datetime
from zipfile import ZipFile

from collections import defaultdict
from pylons import tmpl_context as c, app_globals as g
import mock
from nose.tools import assert_equal
import tg
import ming
from ming.base import Object
from ming.orm import session, ThreadLocalORMSession
from testfixtures import TempDirectory
from IPython.testing.decorators import onlyif
import pysvn

from alluratest.controller import setup_basic_test, setup_global_objects
from allura import model as M
from allura.model.repo_refresh import send_notifications
from allura.lib import helpers as h
from allura.tests import decorators as td
from allura.tests.model.test_repo import RepoImplTestBase

from forgesvn import model as SM
from forgesvn.model.svn import svn_path_exists
from forgesvn.tests import with_svn
from allura.tests.decorators import with_tool

class TestNewRepo(unittest.TestCase):

    def setUp(self):
        setup_basic_test()
        self.setup_with_tools()

    @with_svn
    def setup_with_tools(self):
        setup_global_objects()
        h.set_context('test', 'src', neighborhood='Projects')
        repo_dir = pkg_resources.resource_filename(
            'forgesvn', 'tests/data/')
        self.repo = SM.Repository(
            name='testsvn',
            fs_path=repo_dir,
            url_path = '/test/',
            tool = 'svn',
            status = 'creating')
        self.repo.refresh()
        self.rev = self.repo.commit('HEAD')
        ThreadLocalORMSession.flush_all()
        ThreadLocalORMSession.close_all()

    def test_last_commit_for(self):
        tree = self.rev.tree
        for row in tree.ls():
            assert row['last_commit']['author'] is not None

    def test_commit(self):
        assert self.rev.primary() is self.rev
        assert self.rev.index_id().startswith('allura/model/repo/Commit#')
        self.rev.author_url
        self.rev.committer_url
        assert self.rev.tree._id == self.rev.tree_id
        assert self.rev.summary == self.rev.message.splitlines()[0]
        assert self.rev.shorthand_id() == '[r5]'
        assert self.rev.symbolic_ids == ([], [])
        assert self.rev.url() == (
            '/p/test/src/5/')
        all_cis = list(self.repo.log(self.rev._id))
        assert len(all_cis) == 5
        self.rev.tree.ls()
        assert self.rev.tree.readme() == (
            'README', 'This is readme\nAnother Line\n')
        assert self.rev.tree.path() == '/'
        assert self.rev.tree.url() == (
            '/p/test/src/5/tree/')
        self.rev.tree.by_name['README']
        assert self.rev.tree.is_blob('README') == True
        assert self.rev.tree['a']['b']['c'].ls() == []
        self.assertRaises(KeyError, lambda:self.rev.tree['a']['b']['d'])

class TestSVNRepo(unittest.TestCase, RepoImplTestBase):

    def setUp(self):
        setup_basic_test()
        self.setup_with_tools()

    @with_svn
    @with_tool('test', 'SVN', 'svn-tags', 'SVN with tags')
    def setup_with_tools(self):
        setup_global_objects()
        h.set_context('test', 'src', neighborhood='Projects')
        repo_dir = pkg_resources.resource_filename(
            'forgesvn', 'tests/data/')
        self.repo = SM.Repository(
            name='testsvn',
            fs_path=repo_dir,
            url_path = '/test/',
            tool = 'svn',
            status = 'creating')
        self.repo.refresh()
        self.svn_tags = SM.Repository(
            name='testsvn-trunk-tags-branches',
            fs_path=repo_dir,
            url_path = '/test/',
            tool = 'svn',
            status = 'creating')
        self.svn_tags.refresh()
        ThreadLocalORMSession.flush_all()
        ThreadLocalORMSession.close_all()

    def test_init(self):
        repo = SM.Repository(
            name='testsvn',
            fs_path=g.tmpdir+'/',
            url_path = '/test/',
            tool = 'svn',
            status = 'creating')
        dirname = os.path.join(repo.fs_path, repo.name)
        if os.path.exists(dirname):
            shutil.rmtree(dirname)
        repo.init()
        shutil.rmtree(dirname)

    def test_fork(self):
        repo = SM.Repository(
            name='testsvn',
            fs_path=g.tmpdir+'/',
            url_path = '/test/',
            tool = 'svn',
            status = 'creating')
        repo_path = pkg_resources.resource_filename(
            'forgesvn', 'tests/data/testsvn')
        dirname = os.path.join(repo.fs_path, repo.name)
        if os.path.exists(dirname):
            shutil.rmtree(dirname)
        repo.init()
        repo._impl.clone_from('file://' + repo_path)
        assert not os.path.exists(os.path.join(g.tmpdir, 'testsvn/hooks/pre-revprop-change'))
        assert os.path.exists(os.path.join(g.tmpdir, 'testsvn/hooks/post-commit'))
        assert os.access(os.path.join(g.tmpdir, 'testsvn/hooks/post-commit'), os.X_OK)
        with open(os.path.join(g.tmpdir, 'testsvn/hooks/post-commit')) as f:
            c = f.read()
        self.assertIn('curl -s http://localhost/auth/refresh_repo/p/test/src/\n', c)
        self.assertIn('exec $DIR/post-commit-user "$@"\n', c)

        repo.refresh(notify=False)
        assert len(list(repo.log()))

        shutil.rmtree(dirname)

    @mock.patch('forgesvn.model.svn.tg')
    def test_can_hotcopy(self, tg):
        from forgesvn.model.svn import SVNImplementation
        func = SVNImplementation.can_hotcopy
        obj = mock.Mock(spec=SVNImplementation)
        for combo in product(
                ['file:///myfile', 'http://myfile'],
                [True, False],
                ['version 1.7', 'version 1.6', 'version 2.0.3']):
            source_url = combo[0]
            tg.config = {'scm.svn.hotcopy': combo[1]}
            stdout = combo[2]
            obj.check_call.return_value = stdout, ''
            expected = (source_url.startswith('file://') and
                    tg.config['scm.svn.hotcopy'] and
                    stdout != 'version 1.6')
            result = func(obj, source_url)
            assert result == expected

    @mock.patch('forgesvn.model.svn.g.post_event')
    def test_clone(self, post_event):
        repo = SM.Repository(
            name='testsvn',
            fs_path=g.tmpdir+'/',
            url_path = '/test/',
            tool = 'svn',
            status = 'creating')
        repo_path = pkg_resources.resource_filename(
            'forgesvn', 'tests/data/testsvn')
        dirname = os.path.join(repo.fs_path, repo.name)
        if os.path.exists(dirname):
            shutil.rmtree(dirname)
        repo.init()
        repo._impl.clone_from('file://' + repo_path)
        assert not os.path.exists(os.path.join(g.tmpdir, 'testsvn/hooks/pre-revprop-change'))
        assert os.path.exists(os.path.join(g.tmpdir, 'testsvn/hooks/post-commit'))
        assert os.access(os.path.join(g.tmpdir, 'testsvn/hooks/post-commit'), os.X_OK)
        with open(os.path.join(g.tmpdir, 'testsvn/hooks/post-commit')) as f:
            c = f.read()
        self.assertIn('curl -s http://localhost/auth/refresh_repo/p/test/src/\n', c)
        self.assertIn('exec $DIR/post-commit-user "$@"\n', c)

        repo.refresh(notify=False)
        assert len(list(repo.log()))

        shutil.rmtree(dirname)

    def test_index(self):
        i = self.repo.index()
        assert i['type_s'] == 'SVN Repository', i

    def test_log_id_only(self):
        entries = list(self.repo.log(id_only=True))
        assert_equal(entries, [5, 4, 3, 2, 1])

    def test_log(self):
        entries = list(self.repo.log(id_only=False))
        assert_equal(entries, [
            {'authored': {'date': datetime(2010, 11, 18, 20, 14, 21, 515743),
                          'email': '',
                          'name': u'rick446'},
             'committed': {'date': datetime(2010, 11, 18, 20, 14, 21, 515743),
                           'email': '',
                           'name': u'rick446'},
             'id': 5,
             'message': u'Copied a => b',
             'parents': [4],
             'refs': ['HEAD'],
             'size': 0},
            {'authored': {'date': datetime(2010, 10, 8, 15, 32, 59, 383719),
                          'email': '',
                          'name': u'rick446'},
             'committed': {'date': datetime(2010, 10, 8, 15, 32, 59, 383719),
                           'email': '',
                           'name': u'rick446'},
             'id': 4,
             'message': u'Remove hello.txt',
             'parents': [3],
             'refs': [],
             'size': 0},
            {'authored': {'date': datetime(2010, 10, 8, 15, 32, 48, 272296),
                          'email': '',
                          'name': u'rick446'},
             'committed': {'date': datetime(2010, 10, 8, 15, 32, 48, 272296),
                           'email': '',
                           'name': u'rick446'},
             'id': 3,
             'message': u'Modify readme',
             'parents': [2],
             'refs': [],
             'size': 0},
            {'authored': {'date': datetime(2010, 10, 8, 15, 32, 36, 221863),
                          'email': '',
                          'name': u'rick446'},
             'committed': {'date': datetime(2010, 10, 8, 15, 32, 36, 221863),
                           'email': '',
                           'name': u'rick446'},
             'id': 2,
             'message': u'Add path',
             'parents': [1],
             'refs': [],
             'size': 0},
            {'authored': {'date': datetime(2010, 10, 8, 15, 32, 7, 238375),
                          'email': '',
                          'name': u'rick446'},
             'committed': {'date': datetime(2010, 10, 8, 15, 32, 7, 238375),
                           'email': '',
                           'name': u'rick446'},
             'id': 1,
             'message': u'Create readme',
             'parents': [],
             'refs': [],
             'size': 0},
            ])

    def test_log_file(self):
        entries = list(self.repo.log(path='/README', id_only=False))
        assert_equal(entries, [
            {'authored': {'date': datetime(2010, 10, 8, 15, 32, 48, 272296),
                          'email': '',
                          'name': u'rick446'},
             'committed': {'date': datetime(2010, 10, 8, 15, 32, 48, 272296),
                           'email': '',
                           'name': u'rick446'},
             'id': 3,
             'message': u'Modify readme',
             'parents': [2],
             'refs': [],
             'size': 28},
            {'authored': {'date': datetime(2010, 10, 8, 15, 32, 7, 238375),
                          'email': '',
                          'name': u'rick446'},
             'committed': {'date': datetime(2010, 10, 8, 15, 32, 7, 238375),
                           'email': '',
                           'name': u'rick446'},
             'id': 1,
             'message': u'Create readme',
             'parents': [],
             'refs': [],
             'size': 28},
            ])

    def test_is_file(self):
        assert self.repo.is_file('/README')
        assert not self.repo.is_file('/a')

    def test_paged_diffs(self):
        entry = self.repo.commit(self.repo.log(2, id_only=True).next())
        self.assertEqual(entry.diffs, entry.paged_diffs())
        self.assertEqual(entry.diffs, entry.paged_diffs(start=0))
        added_expected = entry.diffs.added[1:3]
        expected =  dict(
                copied=[], changed=[], removed=[],
                added=added_expected, total=4)
        actual = entry.paged_diffs(start=1, end=3)
        self.assertEqual(expected, actual)

        empty = M.repo.Commit().paged_diffs()
        self.assertEqual(sorted(actual.keys()), sorted(empty.keys()))

    def test_diff_create_file(self):
        entry = self.repo.commit(self.repo.log(1, id_only=True).next())
        self.assertEqual(
            entry.diffs, dict(
                copied=[], changed=[],
                removed=[], added=['/README'], total=1))

    def test_diff_create_path(self):
        entry = self.repo.commit(self.repo.log(2, id_only=True).next())
        actual = entry.diffs
        actual.added = sorted(actual.added)
        self.assertEqual(
            entry.diffs, dict(
                copied=[], changed=[], removed=[],
                added=sorted([
                    '/a', '/a/b', '/a/b/c',
                    '/a/b/c/hello.txt']), total=4))

    def test_diff_modify_file(self):
        entry = self.repo.commit(self.repo.log(3, id_only=True).next())
        self.assertEqual(
            entry.diffs, dict(
                copied=[], changed=['/README'],
                removed=[], added=[], total=1))

    def test_diff_delete(self):
        entry = self.repo.commit(self.repo.log(4, id_only=True).next())
        self.assertEqual(
            entry.diffs, dict(
                copied=[], changed=[],
                removed=['/a/b/c/hello.txt'], added=[], total=1))

    def test_diff_copy(self):
        # Copies are currently only detected as 'add'
        entry = self.repo.commit(self.repo.log(5, id_only=True).next())
        self.assertEqual(
            entry.diffs, dict(
                copied=[], changed=[],
                removed=[], added=['/b'], total=1))

    def test_commit(self):
        entry = self.repo.commit(1)
        assert entry.committed.name == 'rick446'
        assert entry.message

    def test_svn_path_exists(self):
        repo_path = pkg_resources.resource_filename(
            'forgesvn', 'tests/data/testsvn')
        assert svn_path_exists("file://%s/a" % repo_path)
        assert svn_path_exists("file://%s" % repo_path)
        assert not svn_path_exists("file://%s/badpath" % repo_path)

    @onlyif(os.path.exists(tg.config.get('scm.repos.tarball.zip_binary', '/usr/bin/zip')), 'zip binary is missing')
    def test_tarball(self):
        tmpdir = tg.config['scm.repos.tarball.root']
        assert_equal(self.repo.tarball_path, os.path.join(tmpdir, 'svn/t/te/test/testsvn'))
        assert_equal(self.repo.tarball_url('1'), 'file:///svn/t/te/test/testsvn/test-src-1.zip')
        self.repo.tarball('1')
        assert os.path.isfile(os.path.join(tmpdir, "svn/t/te/test/testsvn/test-src-1.zip"))
        tarball_zip = ZipFile(os.path.join(tmpdir, 'svn/t/te/test/testsvn/test-src-1.zip'), 'r')
        assert_equal(tarball_zip.namelist(), ['test-src-1/', 'test-src-1/README'])
        shutil.rmtree(self.repo.tarball_path, ignore_errors=True)

    @onlyif(os.path.exists(tg.config.get('scm.repos.tarball.zip_binary', '/usr/bin/zip')), 'zip binary is missing')
    def test_tarball_aware_of_tags(self):
        rev = '19'
        tag_content = sorted(['test-svn-tags-19-tags-tag-1.0/',
                              'test-svn-tags-19-tags-tag-1.0/svn-commit.tmp',
                              'test-svn-tags-19-tags-tag-1.0/README'])
        h.set_context('test', 'svn-tags', neighborhood='Projects')
        tmpdir = tg.config['scm.repos.tarball.root']
        tarball_path = os.path.join(tmpdir, 'svn/t/te/test/testsvn-trunk-tags-branches/')
        fn = tarball_path + 'test-svn-tags-19-tags-tag-1.0.zip'
        self.svn_tags.tarball(rev, '/tags/tag-1.0/')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()), tag_content)
        os.remove(fn)
        self.svn_tags.tarball(rev, '/tags/tag-1.0/some/path/')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()), tag_content)
        os.remove(fn)
        # if inside of tags, but no tag is specified
        # expect snapshot of trunk
        fn = tarball_path + 'test-svn-tags-19-trunk.zip'
        self.svn_tags.tarball(rev, '/tags/')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()),
                     sorted(['test-svn-tags-19-trunk/',
                             'test-svn-tags-19-trunk/aaa.txt',
                             'test-svn-tags-19-trunk/bbb.txt',
                             'test-svn-tags-19-trunk/ccc.txt',
                             'test-svn-tags-19-trunk/README']))
        shutil.rmtree(tarball_path, ignore_errors=True)

    @onlyif(os.path.exists(tg.config.get('scm.repos.tarball.zip_binary', '/usr/bin/zip')), 'zip binary is missing')
    def test_tarball_aware_of_branches(self):
        rev = '19'
        branch_content = sorted(['test-svn-tags-19-branches-aaa/',
                                 'test-svn-tags-19-branches-aaa/aaa.txt',
                                 'test-svn-tags-19-branches-aaa/svn-commit.tmp',
                                 'test-svn-tags-19-branches-aaa/README'])
        h.set_context('test', 'svn-tags', neighborhood='Projects')
        tmpdir = tg.config['scm.repos.tarball.root']
        tarball_path = os.path.join(tmpdir, 'svn/t/te/test/testsvn-trunk-tags-branches/')
        fn = tarball_path + 'test-svn-tags-19-branches-aaa.zip'
        self.svn_tags.tarball(rev, '/branches/aaa/')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()), branch_content)
        os.remove(fn)
        self.svn_tags.tarball(rev, '/branches/aaa/some/path/')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()), branch_content)
        os.remove(fn)
        # if inside of branches, but no branch is specified
        # expect snapshot of trunk
        fn = tarball_path + 'test-svn-tags-19-trunk.zip'
        self.svn_tags.tarball(rev, '/branches/')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()),
                     sorted(['test-svn-tags-19-trunk/',
                             'test-svn-tags-19-trunk/aaa.txt',
                             'test-svn-tags-19-trunk/bbb.txt',
                             'test-svn-tags-19-trunk/ccc.txt',
                             'test-svn-tags-19-trunk/README']))
        shutil.rmtree(tarball_path, ignore_errors=True)

    @onlyif(os.path.exists(tg.config.get('scm.repos.tarball.zip_binary', '/usr/bin/zip')), 'zip binary is missing')
    def test_tarball_aware_of_trunk(self):
        rev = '19'
        trunk_content = sorted(['test-svn-tags-19-trunk/',
                                'test-svn-tags-19-trunk/aaa.txt',
                                'test-svn-tags-19-trunk/bbb.txt',
                                'test-svn-tags-19-trunk/ccc.txt',
                                'test-svn-tags-19-trunk/README'])
        h.set_context('test', 'svn-tags', neighborhood='Projects')
        tmpdir = tg.config['scm.repos.tarball.root']
        tarball_path = os.path.join(tmpdir, 'svn/t/te/test/testsvn-trunk-tags-branches/')
        fn = tarball_path + 'test-svn-tags-19-trunk.zip'
        self.svn_tags.tarball(rev, '/trunk/')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()), trunk_content)
        os.remove(fn)
        self.svn_tags.tarball(rev, '/trunk/some/path/')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()), trunk_content)
        os.remove(fn)
        # no path, but there are trunk in the repo
        # expect snapshot of trunk
        self.svn_tags.tarball(rev)
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(sorted(snapshot.namelist()), trunk_content)
        os.remove(fn)
        # no path, and no trunk dir
        # expect snapshot of repo root
        h.set_context('test', 'src', neighborhood='Projects')
        fn = os.path.join(tmpdir, 'svn/t/te/test/testsvn/test-src-1.zip')
        self.repo.tarball('1')
        assert os.path.isfile(fn), fn
        snapshot = ZipFile(fn, 'r')
        assert_equal(snapshot.namelist(), ['test-src-1/', 'test-src-1/README'])
        shutil.rmtree(os.path.join(tmpdir, 'svn/t/te/test/testsvn/'), ignore_errors=True)
        shutil.rmtree(tarball_path, ignore_errors=True)

    def test_is_empty(self):
        assert not self.repo.is_empty()
        with TempDirectory() as d:
            repo2 = SM.Repository(
                name='test',
                fs_path=d.path,
                url_path = '/test/',
                tool = 'svn',
                status = 'creating')
            repo2.init()
            assert repo2.is_empty()
            repo2.refresh()
            ThreadLocalORMSession.flush_all()
            assert repo2.is_empty()

class TestSVNRev(unittest.TestCase):

    def setUp(self):
        setup_basic_test()
        self.setup_with_tools()

    @with_svn
    def setup_with_tools(self):
        setup_global_objects()
        h.set_context('test', 'src', neighborhood='Projects')
        repo_dir = pkg_resources.resource_filename(
            'forgesvn', 'tests/data/')
        self.repo = SM.Repository(
            name='testsvn',
            fs_path=repo_dir,
            url_path = '/test/',
            tool = 'svn',
            status = 'creating')
        self.repo.refresh()
        self.rev = self.repo.commit(1)
        ThreadLocalORMSession.flush_all()
        ThreadLocalORMSession.close_all()

    def test_url(self):
        assert self.rev.url().endswith('/1/')

    def test_primary(self):
        assert self.rev.primary() == self.rev

    def test_shorthand(self):
        assert self.rev.shorthand_id() == '[r1]'

    def test_diff(self):
        diffs = (self.rev.diffs.added
                 +self.rev.diffs.removed
                 +self.rev.diffs.changed
                 +self.rev.diffs.copied)
        for d in diffs:
            print d

    def _oid(self, rev_id):
        return '%s:%s' % (self.repo._id, rev_id)

    def test_log(self):
        # path only
        commits = list(self.repo.log(self.repo.head, id_only=True))
        assert_equal(commits, [5, 4, 3, 2, 1])
        commits = list(self.repo.log(self.repo.head, 'README', id_only=True))
        assert_equal(commits, [3, 1])
        commits = list(self.repo.log(1, 'README', id_only=True))
        assert_equal(commits, [1])
        commits = list(self.repo.log(self.repo.head, 'a/b/c/', id_only=True))
        assert_equal(commits, [4, 2])
        commits = list(self.repo.log(3, 'a/b/c/', id_only=True))
        assert_equal(commits, [2])
        assert_equal(list(self.repo.log(self.repo.head, 'does/not/exist', id_only=True)), [])

    def test_notification_email(self):
        setup_global_objects()
        h.set_context('test', 'src', neighborhood='Projects')
        repo_dir = pkg_resources.resource_filename(
            'forgesvn', 'tests/data/')
        self.repo = SM.Repository(
            name='testsvn',
            fs_path=repo_dir,
            url_path = '/test/',
            tool = 'svn',
            status = 'creating')
        self.repo.refresh()
        ThreadLocalORMSession.flush_all()
        send_notifications(self.repo, [self.repo.rev_to_commit_id(1)])
        ThreadLocalORMSession.flush_all()
        n = M.Notification.query.find(
            dict(subject='[test:src] [r1] - rick446: Create readme')).first()
        assert n
        assert_equal(n.text, 'Create readme http://localhost/p/test/src/1/')


class _Test(unittest.TestCase):
    idgen = ( 'obj_%d' % i for i in count())

    def _make_tree(self, object_id, **kwargs):
        t, isnew = M.repo.Tree.upsert(object_id)
        repo = getattr(self, 'repo', None)
        t.repo = repo
        for k,v in kwargs.iteritems():
            if isinstance(v, basestring):
                obj = M.repo.Blob(
                    t, k, self.idgen.next())
                t.blob_ids.append(Object(
                        name=k, id=obj._id))
            else:
                obj = self._make_tree(self.idgen.next(), **v)
                t.tree_ids.append(Object(
                        name=k, id=obj._id))
        session(t).flush()
        return t

    def _make_commit(self, object_id, **tree_parts):
        ci, isnew = M.repo.Commit.upsert(object_id)
        if isnew:
            ci.committed.email=c.user.email_addresses[0]
            ci.authored.email=c.user.email_addresses[0]
            dt = datetime.utcnow()
            # BSON datetime resolution is to 1 millisecond, not 1 microsecond
            # like Python. Round this now so it'll match the value that's
            # pulled from MongoDB in the tests.
            ci.authored.date = dt.replace(microsecond=dt.microsecond/1000 * 1000)
            ci.message='summary\n\nddescription'
            ci.set_context(self.repo)
            ci.tree_id = 't_' + object_id
            ci.tree = self._make_tree(ci.tree_id, **tree_parts)
        return ci, isnew

    def _make_log(self, ci):
        session(ci).flush(ci)
        rb = M.repo_refresh.CommitRunBuilder([ci._id])
        rb.run()
        rb.cleanup()

    def setUp(self):
        setup_basic_test()
        setup_global_objects()
        ThreadLocalORMSession.flush_all()
        ThreadLocalORMSession.close_all()
        self.prefix = tg.config.get('scm.repos.root', '/')

class _TestWithRepo(_Test):
    def setUp(self):
        super(_TestWithRepo, self).setUp()
        h.set_context('test', neighborhood='Projects')
        c.project.install_app('svn', 'test1')
        h.set_context('test', 'test1', neighborhood='Projects')
        self.repo = M.Repository(name='test1', tool='svn')
        self.repo._impl = mock.Mock(spec=M.RepositoryImplementation())
        self.repo._impl.shorthand_for_commit = M.RepositoryImplementation.shorthand_for_commit
        self.repo._impl.url_for_commit = (
            lambda *a, **kw: M.RepositoryImplementation.url_for_commit(
                self.repo._impl, *a, **kw))
        self.repo._impl._repo = self.repo
        self.repo._impl.all_commit_ids = lambda *a,**kw: []
        self.repo._impl.commit().symbolic_ids = None
        ThreadLocalORMSession.flush_all()
        # ThreadLocalORMSession.close_all()

class _TestWithRepoAndCommit(_TestWithRepo):
    def setUp(self):
        super(_TestWithRepoAndCommit, self).setUp()
        self.ci, isnew = self._make_commit('foo')
        ThreadLocalORMSession.flush_all()
        # ThreadLocalORMSession.close_all()

class TestRepo(_TestWithRepo):

    def test_create(self):
        assert self.repo.fs_path == os.path.join(self.prefix, 'svn/p/test/')
        assert self.repo.url_path == '/p/test/'
        assert self.repo.full_fs_path == os.path.join(self.prefix, 'svn/p/test/test1')

    def test_passthrough(self):
        argless = ['init']
        for fn in argless:
            getattr(self.repo, fn)()
            getattr(self.repo._impl, fn).assert_called_with()
        unary = [ 'commit', 'open_blob' ]
        for fn in unary:
            getattr(self.repo, fn)('foo')
            getattr(self.repo._impl, fn).assert_called_with('foo')

    def test_shorthand_for_commit(self):
        self.assertEqual(
            self.repo.shorthand_for_commit('a'*40),
            '[aaaaaa]')

    def test_url_for_commit(self):
        self.assertEqual(
            self.repo.url_for_commit('a'*40),
            '/p/test/test1/ci/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/')

    @mock.patch('allura.model.repository.g.post_event')
    def test_init_as_clone(self, post_event):
        self.repo.init_as_clone('srcpath', 'srcname', 'srcurl')
        assert self.repo.upstream_repo.name == 'srcname'
        assert self.repo.upstream_repo.url == 'srcurl'
        assert self.repo._impl.clone_from.called_with('srcpath')
        post_event.assert_called_once_with('repo_cloned', 'srcurl', 'srcpath')

    def test_latest(self):
        ci = mock.Mock()
        self.repo._impl.commit = mock.Mock(return_value=ci)
        assert self.repo.latest() is ci

    def test_index(self):
        i = self.repo.index()
        assert i['type_s'] == 'Repository', i
        assert i['name_s'] == 'test1', i

    def test_scm_host_url(self):
        assert (
            self.repo.clone_url('rw', 'nobody')
            == 'svn+ssh://nobody@localhost:8022/scm-repo/p/test/test1/'),\
            self.repo.clone_url('rw', 'nobody')
        assert (
            self.repo.clone_url('https', 'nobody')
            == 'https://nobody@localhost:8022/scm-repo/p/test/test1/'),\
            self.repo.clone_url('https', 'nobody')

    def test_merge_request(self):
        M.MergeRequest.upsert(app_config_id=c.app.config._id, status='open')
        M.MergeRequest.upsert(app_config_id=c.app.config._id, status='closed')
        session(M.MergeRequest).flush()
        session(M.MergeRequest).clear()
        assert self.repo.merge_requests_by_statuses('open').count() == 1
        assert self.repo.merge_requests_by_statuses('closed').count() == 1
        assert self.repo.merge_requests_by_statuses('open', 'closed').count() == 2

    def test_guess_type(self):
        assert self.repo.guess_type('foo.txt') == ('text/plain', None)
        assert self.repo.guess_type('foo.gbaer') == ('application/octet-stream', None)
        assert self.repo.guess_type('foo.html') == ('text/html', None)
        assert self.repo.guess_type('.gitignore') == ('text/plain', None)

    def test_refresh(self):
        committer_name = 'Test Committer'
        committer_email = 'test@example.com'
        ci = mock.Mock()
        ci.authored.name = committer_name
        ci.committed.name = committer_name
        ci.committed.email = committer_email
        ci.author_url = '/u/test-committer/'
        self.repo._impl.commit = mock.Mock(return_value=ci)
        self.repo._impl.new_commits = mock.Mock(return_value=['foo%d' % i for i in range(100) ])
        self.repo._impl.all_commit_ids = mock.Mock(return_value=['foo%d' % i for i in range(100) ])
        self.repo.symbolics_for_commit = mock.Mock(return_value=[['master', 'branch'], []])
        def refresh_commit_info(oid, seen, lazy=False):
            M.repo.CommitDoc(dict(
                    authored=dict(
                        name=committer_name,
                        email=committer_email),
                    _id=oid)).m.insert()
        self.repo._impl.refresh_commit_info = refresh_commit_info
        _id = lambda oid: getattr(oid, '_id', str(oid))
        self.repo.shorthand_for_commit = lambda oid: '[' + _id(oid) + ']'
        self.repo.url_for_commit = lambda oid: '/ci/' + _id(oid) + '/'
        self.repo.refresh()
        ThreadLocalORMSession.flush_all()
        notifications = M.Notification.query.find().all()
        for n in notifications:
            if '100 new commits' in n.subject:
                assert "master,branch:  by %s http://localhost/ci/foo99" % committer_name in n.text
                break
        else:
            assert False, 'Did not find notification'
        assert M.Feed.query.find(dict(
            author_name=committer_name)).count() == 100

    def test_refresh_private(self):
        ci = mock.Mock()
        self.repo._impl.commit = mock.Mock(return_value=ci)
        self.repo._impl.new_commits = mock.Mock(return_value=['foo%d' % i for i in range(100) ])

        # make unreadable by *anonymous, so additional notification logic executes
        self.repo.acl = []
        c.project.acl = []

        self.repo.refresh()

    def test_push_upstream_context(self):
        self.repo.init_as_clone('srcpath', '/p/test/svn/', '/p/test/svn/')
        old_app_instance = M.Project.app_instance
        try:
            M.Project.app_instance = mock.Mock(return_value=ming.base.Object(
                    config=ming.base.Object(_id=None)))
            with self.repo.push_upstream_context():
                assert c.project.shortname == 'test'
        finally:
            M.Project.app_instance = old_app_instance

    def test_pending_upstream_merges(self):
        self.repo.init_as_clone('srcpath', '/p/test/svn/', '/p/test/svn/')
        old_app_instance = M.Project.app_instance
        try:
            M.Project.app_instance = mock.Mock(return_value=ming.base.Object(
                    config=ming.base.Object(_id=None)))
            self.repo.pending_upstream_merges()
        finally:
            M.Project.app_instance = old_app_instance

class TestMergeRequest(_TestWithRepoAndCommit):

    def setUp(self):
        super(TestMergeRequest, self).setUp()
        c.project.install_app('svn', 'test2')
        h.set_context('test', 'test2', neighborhood='Projects')
        self.repo2 = M.Repository(name='test2', tool='svn')
        self.repo2._impl = mock.Mock(spec=M.RepositoryImplementation())
        self.repo2._impl.log = lambda *a,**kw:(['foo'], [])
        self.repo2._impl.all_commit_ids = lambda *a,**kw: []
        self.repo2._impl._repo = self.repo2
        self.repo2.init_as_clone('/p/test/', 'test1', '/p/test/test1/')
        ThreadLocalORMSession.flush_all()
        ThreadLocalORMSession.close_all()

    def test_upsert(self):
        h.set_context('test', 'test1', neighborhood='Projects')
        mr = M.MergeRequest.upsert(
            downstream=ming.base.Object(
                project_id=c.project._id,
                mount_point='test2',
                commit_id='foo:2'),
            target_branch='foobranch',
            summary='summary',
            description='description')
        u = M.User.by_username('test-admin')
        assert_equal(mr.creator, u)
        assert_equal(mr.creator_name,  u.get_pref('display_name'))
        assert_equal(mr.creator_url,  u.url())
        assert_equal(mr.downstream_url,  '/p/test/test2/')
        assert_equal(mr.downstream_repo_url,  'http://svn.localhost/p/test/test2/')
        with mock.patch('forgesvn.model.svn.SVNLibWrapper') as _svn,\
             mock.patch('forgesvn.model.svn.SVNImplementation._map_log') as _map_log:
            mr.app.repo._impl.head = 1
            _svn().log.return_value = [mock.Mock(revision=mock.Mock(number=2))]
            _map_log.return_value = 'bar'
            assert_equal(mr.commits,  ['bar'])
            # can't do assert_called_once_with because pysvn.Revision doesn't compare nicely
            assert_equal(_svn().log.call_count, 1)
            assert_equal(_svn().log.call_args[0], ('file:///tmp/svn/p/test/test2',))
            assert_equal(_svn().log.call_args[1]['revision_start'].number, 2)
            assert_equal(_svn().log.call_args[1]['limit'], 25)
            _map_log.assert_called_once_with(_svn().log.return_value[0], 'file:///tmp/svn/p/test/test2')

class TestRepoObject(_TestWithRepoAndCommit):

    def test_upsert(self):
        obj0, isnew0 = M.repo.Tree.upsert('foo1')
        obj1, isnew1 = M.repo.Tree.upsert('foo1')
        assert obj0 is obj1
        assert isnew0 and not isnew1

    def test_artifact_methods(self):
        assert self.ci.index_id() == 'allura/model/repo/Commit#foo', self.ci.index_id()
        assert self.ci.primary() is self.ci, self.ci.primary()


class TestCommit(_TestWithRepo):

    def setUp(self):
        super(TestCommit, self).setUp()
        self.ci, isnew = self._make_commit(
            'foo',
            a=dict(
                a=dict(
                    a='',
                    b='',),
                b=''))
        self.tree = self.ci.tree
        impl = M.RepositoryImplementation()
        impl._repo = self.repo
        self.repo._impl.shorthand_for_commit = impl.shorthand_for_commit
        self.repo._impl.url_for_commit = impl.url_for_commit

    def test_upsert(self):
        obj0, isnew0 = M.repo.Commit.upsert('foo')
        obj1, isnew1 = M.repo.Commit.upsert('foo')
        assert obj0 is obj1
        assert not isnew1
        u = M.User.by_username('test-admin')
        assert self.ci.author_url == u.url()
        assert self.ci.committer_url == u.url()
        assert self.ci.tree is self.tree
        assert self.ci.summary == 'summary'
        assert self.ci.shorthand_id() == '[foo]'
        assert self.ci.url() == '/p/test/test1/ci/foo/'

    def test_get_path(self):
        b = self.ci.get_path('a/a/a')
        assert isinstance(b, M.repo.Blob)
        x = self.ci.get_path('a/a')
        assert isinstance(x, M.repo.Tree)

    def _unique_blobs(self):
        def counter():
            counter.i += 1
            return counter.i
        counter.i = 0
        blobs = defaultdict(counter)
        from cStringIO import StringIO
        return lambda blob: StringIO(str(blobs[blob.path()]))

    def test_compute_diffs(self):
        self.repo._impl.commit = mock.Mock(return_value=self.ci)
        self.repo._impl.open_blob = self._unique_blobs()
        M.repo_refresh.refresh_commit_trees(self.ci, {})
        M.repo_refresh.compute_diffs(self.repo._id, {}, self.ci)
        # self.ci.compute_diffs()
        assert_equal(self.ci.diffs.added, [ 'a', 'a/a', 'a/a/a', 'a/a/b', 'a/b' ])
        assert (self.ci.diffs.copied
                == self.ci.diffs.changed
                == self.ci.diffs.removed
                == [])
        ci, isnew = self._make_commit('bar')
        ci.parent_ids = [ 'foo' ]
        self._make_log(ci)
        M.repo_refresh.refresh_commit_trees(ci, {})
        M.repo_refresh.compute_diffs(self.repo._id, {}, ci)
        assert_equal(ci.diffs.removed, [ 'a', 'a/a', 'a/a/a', 'a/a/b', 'a/b' ])
        assert (ci.diffs.copied
                == ci.diffs.changed
                == ci.diffs.added
                == [])
        ci, isnew = self._make_commit(
            'baz',
            b=dict(
                a=dict(
                    a='',
                    b='',),
                b=''))
        ci.parent_ids = [ 'foo' ]
        self._make_log(ci)
        M.repo_refresh.refresh_commit_trees(ci, {})
        M.repo_refresh.compute_diffs(self.repo._id, {}, ci)
        assert_equal(ci.diffs.added, [ 'b', 'b/a', 'b/a/a', 'b/a/b', 'b/b' ])
        assert_equal(ci.diffs.removed, [ 'a', 'a/a', 'a/a/a', 'a/a/b', 'a/b' ])
        assert (ci.diffs.copied
                == ci.diffs.changed
                == [])

    def test_diffs_file_renames(self):
        def open_blob(blob):
            blobs = {
                u'a': u'Leia',
                u'/b/a/a': u'Darth Vader',
                u'/b/a/b': u'Luke Skywalker',
                u'/b/b': u'Death Star will destroy you',
                u'/b/c': u'Luke Skywalker',  # moved from /b/a/b
                u'/b/a/z': u'Death Star will destroy you\nALL',  # moved from /b/b and modified
            }
            from cStringIO import StringIO
            return StringIO(blobs.get(blob.path(), ''))
        self.repo._impl.open_blob = open_blob

        self.repo._impl.commit = mock.Mock(return_value=self.ci)
        M.repo_refresh.refresh_commit_trees(self.ci, {})
        M.repo_refresh.compute_diffs(self.repo._id, {}, self.ci)
        assert_equal(self.ci.diffs.added, ['a', 'a/a', 'a/a/a', 'a/a/b', 'a/b'])
        assert (self.ci.diffs.copied
                == self.ci.diffs.changed
                == self.ci.diffs.removed
                == [])

        ci, isnew = self._make_commit(
            'bar',
            b=dict(
                a=dict(
                    a='',
                    b='',),
                b=''))
        ci.parent_ids = ['foo']
        self._make_log(ci)
        M.repo_refresh.refresh_commit_trees(ci, {})
        M.repo_refresh.compute_diffs(self.repo._id, {}, ci)
        assert_equal(ci.diffs.added, ['b', 'b/a', 'b/a/a', 'b/a/b', 'b/b'])
        assert_equal(ci.diffs.removed, ['a', 'a/a', 'a/a/a', 'a/a/b', 'a/b'])
        assert (ci.diffs.copied
                == ci.diffs.changed
                == [])

        ci, isnew = self._make_commit(
            'baz',
            b=dict(
                a=dict(
                    z=''),
                c=''))
        ci.parent_ids = ['bar']
        self._make_log(ci)
        M.repo_refresh.refresh_commit_trees(ci, {})
        M.repo_refresh.compute_diffs(self.repo._id, {}, ci)
        assert_equal(ci.diffs.added, [])
        assert_equal(ci.diffs.changed, [])
        assert_equal(ci.diffs.removed, ['b/a/a'])
        # see mock for open_blob
        assert_equal(len(ci.diffs.copied), 2)
        assert_equal(ci.diffs.copied[0]['old'], 'b/a/b')
        assert_equal(ci.diffs.copied[0]['new'], 'b/c')
        assert_equal(ci.diffs.copied[0]['ratio'], 1)
        assert_equal(ci.diffs.copied[0]['diff'], '')
        assert_equal(ci.diffs.copied[1]['old'], 'b/b')
        assert_equal(ci.diffs.copied[1]['new'], 'b/a/z')
        assert ci.diffs.copied[1]['ratio'] < 1, ci.diffs.copied[1]['ratio']
        assert '+++' in ci.diffs.copied[1]['diff'], ci.diffs.copied[1]['diff']

    def test_context(self):
        self.ci.context()
