#       Licensed to the Apache Software Foundation (ASF) under one
#       or more contributor license agreements.  See the NOTICE file
#       distributed with this work for additional information
#       regarding copyright ownership.  The ASF licenses this file
#       to you under the Apache License, Version 2.0 (the
#       "License"); you may not use this file except in compliance
#       with the License.  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#       Unless required by applicable law or agreed to in writing,
#       software distributed under the License is distributed on an
#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#       KIND, either express or implied.  See the License for the
#       specific language governing permissions and limitations
#       under the License.

from datetime import datetime
from collections import defaultdict, OrderedDict

import unittest
import mock
from nose.tools import assert_equal
from pylons import tmpl_context as c
from bson import ObjectId
from ming.orm import session
from tg import config

from alluratest.controller import setup_basic_test, setup_global_objects
from allura import model as M
from allura.lib import helpers as h


class TestGitLikeTree(object):
    def test_set_blob(self):
        tree = M.GitLikeTree()
        tree.set_blob('/dir/dir2/file', 'file-oid')

        assert_equal(tree.blobs, {})
        assert_equal(tree.get_tree('dir').blobs, {})
        assert_equal(tree.get_tree('dir').get_tree('dir2')
                     .blobs, {'file': 'file-oid'})

    def test_hex(self):
        tree = M.GitLikeTree()
        tree.set_blob('/dir/dir2/file', 'file-oid')
        hex = tree.hex()

        # check the reprs. In case hex (below) fails, this'll be useful
        assert_equal(repr(tree.get_tree('dir').get_tree('dir2')),
                     'b file-oid file')
        assert_equal(repr(tree),
                     't 96af1772ecce1e6044e6925e595d9373ffcd2615 dir')
        # the hex() value shouldn't change, it's an important key
        assert_equal(hex, '4abba29a43411b9b7cecc1a74f0b27920554350d')

        # another one should be the same
        tree2 = M.GitLikeTree()
        tree2.set_blob('/dir/dir2/file', 'file-oid')
        hex2 = tree2.hex()
        assert_equal(hex, hex2)


class RepoImplTestBase(object):
    def test_commit_run(self):
        M.repository.CommitRunDoc.m.remove()
        commit_ids = list(self.repo.all_commit_ids())
        # simulate building up a commit run from multiple pushes
        for c_id in commit_ids:
            crb = M.repo_refresh.CommitRunBuilder([c_id])
            crb.run()
            crb.cleanup()
        runs = M.repository.CommitRunDoc.m.find().all()
        self.assertEqual(len(runs), 1)
        run = runs[0]
        self.assertEqual(run.commit_ids, commit_ids)
        self.assertEqual(len(run.commit_ids), len(run.commit_times))
        self.assertEqual(run.parent_commit_ids, [])

    def test_repair_commit_run(self):
        commit_ids = list(self.repo.all_commit_ids())
        # simulate building up a commit run from multiple pushes, but skip the
        # last commit to simulate a broken commit run
        for c_id in commit_ids[:-1]:
            crb = M.repo_refresh.CommitRunBuilder([c_id])
            crb.run()
            crb.cleanup()
        # now repair the commitrun by rebuilding with all commit ids
        crb = M.repo_refresh.CommitRunBuilder(commit_ids)
        crb.run()
        crb.cleanup()
        runs = M.repository.CommitRunDoc.m.find().all()
        self.assertEqual(len(runs), 1)
        run = runs[0]
        self.assertEqual(run.commit_ids, commit_ids)
        self.assertEqual(len(run.commit_ids), len(run.commit_times))
        self.assertEqual(run.parent_commit_ids, [])


class RepoTestBase(unittest.TestCase):
    def setUp(self):
        setup_basic_test()

    @mock.patch('allura.model.repository.Repository.url')
    def test_refresh_url(self, url):
        url.return_value = '/p/test/repo'
        c.app = mock.Mock(**{'config._id': 'deadbeef'})
        repo = M.repository.Repository()
        cases = [
            [
                None,
                'http://localhost:8080/auth/refresh_repo/p/test/repo',
            ],
            [
                'https://somewhere.com',
                'https://somewhere.com/auth/refresh_repo/p/test/repo',
            ],
            [
                'http://somewhere.com/',
                'http://somewhere.com/auth/refresh_repo/p/test/repo',
            ]]
        for base_url, result in cases:
            values = {}
            if base_url:
                values['base_url'] = base_url
            with mock.patch.dict(config, values, clear=True):
                self.assertEqual(result, repo.refresh_url())


class TestLastCommit(unittest.TestCase):
    def setUp(self):
        setup_basic_test()
        setup_global_objects()
        self.repo = mock.Mock(
            name='repo',
            _commits=OrderedDict(),
            _last_commit=None,
            spec=M.Repository)
        self.repo.paged_diffs.return_value = {
            'added': [],
            'removed': [],
            'copied': [],
            'renamed': [],
            'changed': [],
            'total': 0,
        }
        self.repo.shorthand_for_commit = lambda _id: _id[:6]
        self.repo.rev_to_commit_id = lambda rev: rev
        self.repo.log = self._log
        self._changes = defaultdict(list)
        self.repo.get_changes = lambda _id: self._changes[_id]
        self._last_commits = [(None, set())]
        self.repo._get_last_commit = lambda i, p: self._last_commits.pop()
        lcids = M.repository.RepositoryImplementation.last_commit_ids.__func__
        self.repo.last_commit_ids = lambda *a, **k: lcids(self.repo, *a, **k)
        c.lcid_cache = {}

    def _build_tree(self, commit, path, tree_paths):
        tree_nodes = []
        blob_nodes = []
        sub_paths = defaultdict(list)

        def n(p):
            m = mock.Mock()
            m.name = p
            return m

        for p in tree_paths:
            if '/' in p:
                node, sub = p.split('/', 1)
                if node not in sub_paths:
                    tree_nodes.append(n(node))
                sub_paths[node].append(sub)
            else:
                blob_nodes.append(n(p))
        tree = mock.Mock(
            commit=commit,
            path=mock.Mock(return_value=path),
            tree_ids=tree_nodes,
            blob_ids=blob_nodes,
            other_ids=[],
            repo=self.repo,
        )
        tree.get_obj_by_path = lambda p: self._build_tree(
            commit, p, sub_paths[p])
        tree.__getitem__ = lambda s, p: self._build_tree(
            commit, p, sub_paths[p])
        return tree

    def _add_commit(self, msg, tree_paths, diff_paths=None, parents=[]):
        suser = dict(
            name='test',
            email='test@example.com',
            date=datetime(2013, 1, 1 + len(self.repo._commits)),
        )
        commit = M.repository.Commit(
            _id=str(ObjectId()),
            message=msg,
            parent_ids=[parent._id for parent in parents],
            commited=suser,
            authored=suser,
            repo=self.repo,
        )
        commit.tree = self._build_tree(commit, '/', tree_paths)
        commit.get_tree = lambda c: commit.tree
        self._changes[commit._id].extend(diff_paths or tree_paths)
        self.repo._commits[commit._id] = commit
        self._last_commits.append((commit._id, set(diff_paths or tree_paths)))
        return commit

    def _log(self, revs, path, id_only=True, limit=None):
        for commit_id, commit in reversed(self.repo._commits.items()):
            if path in commit.changed_paths:
                yield commit_id

    def test_single_commit(self):
        commit1 = self._add_commit('Commit 1', [
            'file1',
            'dir1/file2',
        ])
        lcd = M.repository.LastCommit.get(commit1.tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit1.message)
        self.assertEqual(lcd.path, '')
        self.assertEqual(len(lcd.entries), 2)
        self.assertEqual(lcd.by_name['file1'], commit1._id)
        self.assertEqual(lcd.by_name['dir1'], commit1._id)

    def test_multiple_commits_no_overlap(self):
        commit1 = self._add_commit('Commit 1', ['file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'dir1/file1'], ['dir1/file1'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'dir1/file1', 'file2'], ['file2'], [commit2])
        lcd = M.repository.LastCommit.get(commit3.tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit3.message)
        self.assertEqual(lcd.commit_id, commit3._id)
        self.assertEqual(lcd.path, '')
        self.assertEqual(len(lcd.entries), 3)
        self.assertEqual(lcd.by_name['file1'], commit1._id)
        self.assertEqual(lcd.by_name['dir1'], commit2._id)
        self.assertEqual(lcd.by_name['file2'], commit3._id)

    def test_multiple_commits_with_overlap(self):
        commit1 = self._add_commit('Commit 1', ['file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'dir1/file1'], ['dir1/file1'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'dir1/file1', 'file2'], ['file1', 'file2'], [commit2])
        lcd = M.repository.LastCommit.get(commit3.tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit3.message)
        self.assertEqual(lcd.path, '')
        self.assertEqual(len(lcd.entries), 3)
        self.assertEqual(lcd.by_name['file1'], commit3._id)
        self.assertEqual(lcd.by_name['dir1'], commit2._id)
        self.assertEqual(lcd.by_name['file2'], commit3._id)

    def test_multiple_commits_subdir_change(self):
        commit1 = self._add_commit('Commit 1', ['file1', 'dir1/file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'dir1/file1', 'dir1/file2'], ['dir1/file2'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'dir1/file1', 'dir1/file2'], ['dir1/file1'], [commit2])
        lcd = M.repository.LastCommit.get(commit3.tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit3.message)
        self.assertEqual(lcd.path, '')
        self.assertEqual(len(lcd.entries), 2)
        self.assertEqual(lcd.by_name['file1'], commit1._id)
        self.assertEqual(lcd.by_name['dir1'], commit3._id)

    def test_subdir_lcd(self):
        commit1 = self._add_commit('Commit 1', ['file1', 'dir1/file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'dir1/file1', 'dir1/file2'], ['dir1/file2'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'dir1/file1', 'dir1/file2'], ['dir1/file1'], [commit2])
        tree = self._build_tree(commit3, '/dir1', ['file1', 'file2'])
        lcd = M.repository.LastCommit.get(tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit3.message)
        self.assertEqual(lcd.path, 'dir1')
        self.assertEqual(len(lcd.entries), 2)
        self.assertEqual(lcd.by_name['file1'], commit3._id)
        self.assertEqual(lcd.by_name['file2'], commit2._id)

    def test_subdir_lcd_prev_commit(self):
        commit1 = self._add_commit('Commit 1', ['file1', 'dir1/file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'dir1/file1', 'dir1/file2'], ['dir1/file2'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'dir1/file1', 'dir1/file2'], ['dir1/file1'], [commit2])
        commit4 = self._add_commit('Commit 4', ['file1', 'dir1/file1', 'dir1/file2', 'file2'], ['file2'], [commit3])
        tree = self._build_tree(commit4, '/dir1', ['file1', 'file2'])
        lcd = M.repository.LastCommit.get(tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit3.message)
        self.assertEqual(lcd.path, 'dir1')
        self.assertEqual(len(lcd.entries), 2)
        self.assertEqual(lcd.by_name['file1'], commit3._id)
        self.assertEqual(lcd.by_name['file2'], commit2._id)

    def test_subdir_lcd_always_empty(self):
        commit1 = self._add_commit('Commit 1', ['file1', 'dir1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'file2'], ['file2'], [commit1])
        tree = self._build_tree(commit2, '/dir1', [])
        lcd = M.repository.LastCommit.get(tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit1.message)
        self.assertEqual(lcd.path, 'dir1')
        self.assertEqual(lcd.entries, [])

    def test_subdir_lcd_emptied(self):
        commit1 = self._add_commit('Commit 1', ['file1', 'dir1/file1'])
        commit2 = self._add_commit('Commit 2', ['file1'], ['dir1/file1'], [commit1])
        tree = self._build_tree(commit2, '/dir1', [])
        lcd = M.repository.LastCommit.get(tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit2.message)
        self.assertEqual(lcd.path, 'dir1')
        self.assertEqual(lcd.entries, [])

    def test_existing_lcd_unchained(self):
        commit1 = self._add_commit('Commit 1', ['file1', 'dir1/file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'dir1/file1', 'dir1/file2'], ['dir1/file2'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'dir1/file1', 'dir1/file2'], ['file1'], [commit2])
        prev_lcd = M.repository.LastCommit(
            path='dir1',
            commit_id=commit2._id,
            entries=[
                dict(
                    name='file1',
                    commit_id=commit1._id),
                dict(
                    name='file2',
                    commit_id=commit2._id),
            ],
        )
        session(prev_lcd).flush()
        tree = self._build_tree(commit3, '/dir1', ['file1', 'file2'])
        lcd = M.repository.LastCommit.get(tree)
        self.assertEqual(lcd._id, prev_lcd._id)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit2.message)
        self.assertEqual(lcd.path, 'dir1')
        self.assertEqual(lcd.entries, prev_lcd.entries)

    def test_existing_lcd_partial(self):
        commit1 = self._add_commit('Commit 1', ['file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'file2'], ['file2'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'file2', 'file3'], ['file3'], [commit2])
        commit4 = self._add_commit('Commit 4', ['file1', 'file2', 'file3', 'file4'], ['file2', 'file4'], [commit3])
        prev_lcd = M.repository.LastCommit(
            path='',
            commit_id=commit3._id,
            entries=[
                dict(
                    name='file1',
                    commit_id=commit1._id),
                dict(
                    name='file2',
                    commit_id=commit2._id),
                dict(
                    name='file3',
                    commit_id=commit3._id),
            ],
        )
        session(prev_lcd).flush()
        lcd = M.repository.LastCommit.get(commit4.tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit4.message)
        self.assertEqual(lcd.path, '')
        self.assertEqual(len(lcd.entries), 4)
        self.assertEqual(lcd.by_name['file1'], commit1._id)
        self.assertEqual(lcd.by_name['file2'], commit4._id)
        self.assertEqual(lcd.by_name['file3'], commit3._id)
        self.assertEqual(lcd.by_name['file4'], commit4._id)

    def test_missing_add_record(self):
        self._add_commit('Commit 1', ['file1'])
        commit2 = self._add_commit('Commit 2', ['file2'])
        commit2.changed_paths = []
        result = self.repo.last_commit_ids(commit2, ['file2'])
        assert_equal(result, {'file2': commit2._id})

    def test_missing_add_record_first_commit(self):
        commit1 = self._add_commit('Commit 1', ['file1'])
        commit1.changed_paths = []
        result = self.repo.last_commit_ids(commit1, ['file1'])
        assert_equal(result, {'file1': commit1._id})

    def test_timeout(self):
        commit1 = self._add_commit('Commit 1', ['file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'dir1/file1'], ['dir1/file1'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'dir1/file1', 'file2'], ['file2'], [commit2])
        with h.push_config(config, lcd_timeout=-1000):
            lcd = M.repository.LastCommit.get(commit3.tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit3.message)
        self.assertEqual(lcd.commit_id, commit3._id)
        self.assertEqual(lcd.path, '')
        self.assertEqual(len(lcd.entries), 1)
        self.assertEqual(lcd.by_name['file2'], commit3._id)

    def test_loop(self):
        commit1 = self._add_commit('Commit 1', ['file1'])
        commit2 = self._add_commit('Commit 2', ['file1', 'dir1/file1'], ['dir1/file1'], [commit1])
        commit3 = self._add_commit('Commit 3', ['file1', 'dir1/file1', 'file2'], ['file2'], [commit2])
        commit2.parent_ids = [commit3._id]
        session(commit2).flush(commit2)
        lcd = M.repository.LastCommit.get(commit3.tree)
        self.assertEqual(self.repo._commits[lcd.commit_id].message, commit3.message)
        self.assertEqual(lcd.commit_id, commit3._id)
        self.assertEqual(lcd.path, '')
        self.assertEqual(len(lcd.entries), 3)
        self.assertEqual(lcd.by_name['dir1'], commit2._id)
        self.assertEqual(lcd.by_name['file2'], commit3._id)


class TestModelCache(unittest.TestCase):
    def setUp(self):
        self.cache = M.repository.ModelCache()

    def test_normalize_query(self):
        self.assertEqual(self.cache._normalize_query(
            {'foo': 1, 'bar': 2}), (('bar', 2), ('foo', 1)))

    def test_model_query(self):
        q = mock.Mock(spec_set=['query'], query='foo')
        m = mock.Mock(spec_set=['m'], m='bar')
        n = mock.Mock(spec_set=['foo'], foo='qux')
        self.assertEquals(self.cache._model_query(q), 'foo')
        self.assertEquals(self.cache._model_query(m), 'bar')
        self.assertRaises(AttributeError, self.cache._model_query, [n])

    @mock.patch.object(M.repository.Tree.query, 'get')
    @mock.patch.object(M.repository.LastCommit.query, 'get')
    def test_get(self, lc_get, tr_get):
        tree = tr_get.return_value = mock.Mock(
            spec=['_id', 'val'], _id='foo', val='bar')
        lcd = lc_get.return_value = mock.Mock(
            spec=['_id', 'val'], _id='foo', val='qux')

        val = self.cache.get(M.repository.Tree, {'_id': 'foo'})
        tr_get.assert_called_with(_id='foo')
        self.assertEqual(val, tree)

        val = self.cache.get(M.repository.LastCommit, {'_id': 'foo'})
        lc_get.assert_called_with(_id='foo')
        self.assertEqual(val, lcd)

    @mock.patch.object(M.repository.Tree.query, 'get')
    def test_get_no_query(self, tr_get):
        tree1 = tr_get.return_value = mock.Mock(
            spec=['_id', 'val'], _id='foo', val='bar')
        val = self.cache.get(M.repository.Tree, {'_id': 'foo'})
        tr_get.assert_called_once_with(_id='foo')
        self.assertEqual(val, tree1)

        tree2 = tr_get.return_value = mock.Mock(_id='foo', val='qux')
        val = self.cache.get(M.repository.Tree, {'_id': 'foo'})
        tr_get.assert_called_once_with(_id='foo')
        self.assertEqual(val, tree1)

    def test_set(self):
        tree = mock.Mock(spec=['_id', 'test_set'], _id='foo', val='test_set')
        self.cache.set(M.repository.Tree, {'val': 'test_set'}, tree)
        self.assertEqual(self.cache._query_cache,
                         {M.repository.Tree: {(('val', 'test_set'),): 'foo'}})
        self.assertEqual(self.cache._instance_cache,
                         {M.repository.Tree: {'foo': tree}})

    @mock.patch('bson.ObjectId')
    def test_set_none_id(self, obj_id):
        obj_id.return_value = 'OBJID'
        tree = mock.Mock(spec=['_id', 'test_set'], _id=None, val='test_set')
        self.cache.set(M.repository.Tree, {'val1': 'test_set1'}, tree)
        self.cache.set(M.repository.Tree, {'val2': 'test_set2'}, tree)
        self.assertEqual(dict(self.cache._query_cache[M.repository.Tree]), {
            (('val1', 'test_set1'),): 'OBJID',
            (('val2', 'test_set2'),): 'OBJID',
        })
        self.assertEqual(self.cache._instance_cache,
                         {M.repository.Tree: {'OBJID': tree}})
        tree._id = '_id'
        self.assertEqual(self.cache.get(M.repository.Tree, {'val1': 'test_set1'}), tree)
        self.assertEqual(self.cache.get(M.repository.Tree, {'val2': 'test_set2'}), tree)
        self.cache.set(M.repository.Tree, {'val1': 'test_set2'}, tree)
        self.assertEqual(self.cache.get(M.repository.Tree, {'val1': 'test_set1'}), tree)
        self.assertEqual(self.cache.get(M.repository.Tree, {'val2': 'test_set2'}), tree)

    @mock.patch('bson.ObjectId')
    def test_set_none_val(self, obj_id):
        obj_id.return_value = 'OBJID'
        self.cache.set(M.repository.Tree, {'val1': 'test_set1'}, None)
        self.cache.set(M.repository.Tree, {'val2': 'test_set2'}, None)
        self.assertEqual(dict(self.cache._query_cache[M.repository.Tree]), {
            (('val1', 'test_set1'),): None,
            (('val2', 'test_set2'),): None,
        })
        self.assertEqual(dict(self.cache._instance_cache[M.repository.Tree]), {})
        tree1 = mock.Mock(spec=['_id', 'val'], _id='tree1', val='test_set')
        tree2 = mock.Mock(spec=['_model_cache_id', '_id', 'val'],
                          _model_cache_id='tree2', _id='tree1', val='test_set2')
        self.cache.set(M.repository.Tree, {'val1': 'test_set1'}, tree1)
        self.cache.set(M.repository.Tree, {'val2': 'test_set2'}, tree2)
        self.assertEqual(dict(self.cache._query_cache[M.repository.Tree]), {
            (('val1', 'test_set1'),): 'tree1',
            (('val2', 'test_set2'),): 'tree2',
        })
        self.assertEqual(dict(self.cache._instance_cache[M.repository.Tree]), {
            'tree1': tree1,
            'tree2': tree2,
        })

    def test_instance_ids(self):
        tree1 = mock.Mock(spec=['_id', 'val'], _id='id1', val='tree1')
        tree2 = mock.Mock(spec=['_id', 'val'], _id='id2', val='tree2')
        self.cache.set(M.repository.Tree, {'val': 'tree1'}, tree1)
        self.cache.set(M.repository.Tree, {'val': 'tree2'}, tree2)
        self.assertEqual(set(self.cache.instance_ids(M.repository.Tree)),
                         set(['id1', 'id2']))
        self.assertEqual(self.cache.instance_ids(M.repository.LastCommit), [])

    @mock.patch.object(M.repository.Tree.query, 'find')
    def test_batch_load(self, tr_find):
        # cls, query, attrs
        m1 = mock.Mock(spec=['_id', 'foo', 'qux'], _id='id1', foo=1, qux=3)
        m2 = mock.Mock(spec=['_id', 'foo', 'qux'], _id='id2', foo=2, qux=5)
        tr_find.return_value = [m1, m2]

        self.cache.batch_load(M.repository.Tree, {'foo': {'$in': 'bar'}})
        tr_find.assert_called_with({'foo': {'$in': 'bar'}})
        self.assertEqual(self.cache._query_cache[M.repository.Tree], {
            (('foo', 1),): 'id1',
            (('foo', 2),): 'id2',
        })
        self.assertEqual(self.cache._instance_cache[M.repository.Tree], {
            'id1': m1,
            'id2': m2,
        })

    @mock.patch.object(M.repository.Tree.query, 'find')
    def test_batch_load_attrs(self, tr_find):
        # cls, query, attrs
        m1 = mock.Mock(spec=['_id', 'foo', 'qux'], _id='id1', foo=1, qux=3)
        m2 = mock.Mock(spec=['_id', 'foo', 'qux'], _id='id2', foo=2, qux=5)
        tr_find.return_value = [m1, m2]

        self.cache.batch_load(M.repository.Tree, {'foo': {'$in': 'bar'}}, ['qux'])
        tr_find.assert_called_with({'foo': {'$in': 'bar'}})
        self.assertEqual(self.cache._query_cache[M.repository.Tree], {
            (('qux', 3),): 'id1',
            (('qux', 5),): 'id2',
        })
        self.assertEqual(self.cache._instance_cache[M.repository.Tree], {
            'id1': m1,
            'id2': m2,
        })

    def test_pruning(self):
        cache = M.repository.ModelCache(max_queries=3, max_instances=2)
        # ensure cache expires as LRU
        tree1 = mock.Mock(spec=['_id', '_val'], _id='foo', val='bar')
        tree2 = mock.Mock(spec=['_id', '_val'], _id='qux', val='fuz')
        tree3 = mock.Mock(spec=['_id', '_val'], _id='f00', val='b4r')
        tree4 = mock.Mock(spec=['_id', '_val'], _id='foo', val='zaz')
        cache.set(M.repository.Tree, {'_id': 'foo'}, tree1)
        cache.set(M.repository.Tree, {'_id': 'qux'}, tree2)
        cache.set(M.repository.Tree, {'_id': 'f00'}, tree3)
        cache.set(M.repository.Tree, {'_id': 'foo'}, tree4)
        cache.get(M.repository.Tree, {'_id': 'f00'})
        cache.set(M.repository.Tree, {'val': 'b4r'}, tree3)
        self.assertEqual(cache._query_cache, {
            M.repository.Tree: {
                (('_id', 'foo'),): 'foo',
                (('_id', 'f00'),): 'f00',
                (('val', 'b4r'),): 'f00',
            },
        })
        self.assertEqual(cache._instance_cache, {
            M.repository.Tree: {
                'f00': tree3,
                'foo': tree4,
            },
        })

    def test_pruning_query_vs_instance(self):
        cache = M.repository.ModelCache(max_queries=3, max_instances=2)
        # ensure cache expires as LRU
        tree1 = mock.Mock(spec=['_id', '_val'], _id='keep', val='bar')
        tree2 = mock.Mock(spec=['_id', '_val'], _id='tree2', val='fuz')
        tree3 = mock.Mock(spec=['_id', '_val'], _id='tree3', val='b4r')
        tree4 = mock.Mock(spec=['_id', '_val'], _id='tree4', val='zaz')
        cache.set(M.repository.Tree, {'keep_query_1': 'bar'}, tree1)
        cache.set(M.repository.Tree, {'drop_query_1': 'bar'}, tree2)
        # should refresh tree1 in _instance_cache
        cache.set(M.repository.Tree, {'keep_query_2': 'bar'}, tree1)
        # should drop tree2, not tree1, from _instance_cache
        cache.set(M.repository.Tree, {'drop_query_2': 'bar'}, tree3)
        self.assertEqual(cache._query_cache[M.repository.Tree], {
            (('drop_query_1', 'bar'),): 'tree2',
            (('keep_query_2', 'bar'),): 'keep',
            (('drop_query_2', 'bar'),): 'tree3',
        })
        self.assertEqual(cache._instance_cache[M.repository.Tree], {
            'keep': tree1,
            'tree3': tree3,
        })

    @mock.patch('bson.ObjectId')
    def test_pruning_no_id(self, obj_id):
        obj_id.side_effect = ['id1', 'id2', 'id3']
        cache = M.repository.ModelCache(max_queries=3, max_instances=2)
        # ensure cache considers same instance equal to itself, even if no _id
        tree1 = mock.Mock(spec=['val'], val='bar')
        cache.set(M.repository.Tree, {'query_1': 'bar'}, tree1)
        cache.set(M.repository.Tree, {'query_2': 'bar'}, tree1)
        cache.set(M.repository.Tree, {'query_3': 'bar'}, tree1)
        self.assertEqual(cache._instance_cache[M.repository.Tree], {
            'id1': tree1,
        })
        self.assertEqual(cache._query_cache[M.repository.Tree], {
            (('query_1', 'bar'),): 'id1',
            (('query_2', 'bar'),): 'id1',
            (('query_3', 'bar'),): 'id1',
        })

    @mock.patch('bson.ObjectId')
    def test_pruning_none(self, obj_id):
        obj_id.side_effect = ['id1', 'id2', 'id3']
        cache = M.repository.ModelCache(max_queries=3, max_instances=2)
        # ensure cache doesn't store None instances
        cache.set(M.repository.Tree, {'query_1': 'bar'}, None)
        cache.set(M.repository.Tree, {'query_2': 'bar'}, None)
        cache.set(M.repository.Tree, {'query_3': 'bar'}, None)
        self.assertEqual(cache._instance_cache[M.repository.Tree], {})
        self.assertEqual(cache._query_cache[M.repository.Tree], {
            (('query_1', 'bar'),): None,
            (('query_2', 'bar'),): None,
            (('query_3', 'bar'),): None,
        })

    @mock.patch('allura.model.repository.session')
    @mock.patch.object(M.repository.Tree.query, 'get')
    def test_pruning_query_flush(self, tr_get, session):
        cache = M.repository.ModelCache(max_queries=3, max_instances=2)
        # ensure cache doesn't store None instances
        tree1 = mock.Mock(name='tree1',
                          spec=['_id', '_val'], _id='tree1', val='bar')
        tree2 = mock.Mock(name='tree2',
                          spec=['_id', '_val'], _id='tree2', val='fuz')
        tr_get.return_value = tree2
        cache.set(M.repository.Tree, {'_id': 'tree1'}, tree1)
        cache.set(M.repository.Tree, {'_id': 'tree2'}, tree2)
        cache.get(M.repository.Tree, {'query_1': 'tree2'})
        cache.get(M.repository.Tree, {'query_2': 'tree2'})
        cache.get(M.repository.Tree, {'query_3': 'tree2'})
        self.assertEqual(cache._query_cache[M.repository.Tree], {
            (('query_1', 'tree2'),): 'tree2',
            (('query_2', 'tree2'),): 'tree2',
            (('query_3', 'tree2'),): 'tree2',
        })
        self.assertEqual(cache._instance_cache[M.repository.Tree], {
            'tree1': tree1,
            'tree2': tree2,
        })
        self.assertEqual(session.call_args_list,
                         [mock.call(tree1), mock.call(tree2)])
        self.assertEqual(session.return_value.flush.call_args_list,
                         [mock.call(tree1), mock.call(tree2)])
        assert not session.return_value.expunge.called

    @mock.patch('allura.model.repository.session')
    def test_pruning_instance_flush(self, session):
        cache = M.repository.ModelCache(max_queries=3, max_instances=2)
        # ensure cache doesn't store None instances
        tree1 = mock.Mock(spec=['_id', '_val'], _id='tree1', val='bar')
        tree2 = mock.Mock(spec=['_id', '_val'], _id='tree2', val='fuz')
        tree3 = mock.Mock(spec=['_id', '_val'], _id='tree3', val='qux')
        cache.set(M.repository.Tree, {'_id': 'tree1'}, tree1)
        cache.set(M.repository.Tree, {'_id': 'tree2'}, tree2)
        cache.set(M.repository.Tree, {'_id': 'tree3'}, tree3)
        self.assertEqual(cache._query_cache[M.repository.Tree], {
            (('_id', 'tree1'),): 'tree1',
            (('_id', 'tree2'),): 'tree2',
            (('_id', 'tree3'),): 'tree3',
        })
        self.assertEqual(cache._instance_cache[M.repository.Tree], {
            'tree2': tree2,
            'tree3': tree3,
        })
        session.assert_called_once_with(tree1)
        session.return_value.flush.assert_called_once_with(tree1)
        session.return_value.expunge.assert_called_once_with(tree1)


class TestMergeRequest(object):
    def setUp(self):
        setup_basic_test()
        setup_global_objects()
        self.mr = M.MergeRequest(
            app_config=mock.Mock(_id=ObjectId()),
            downstream={'commit_id': '12345'},
            request_number=1,
        )
        self.mr.app = mock.Mock(forkable=True, url='/mock-app-url/')
        self.mr.app.repo.commit.return_value = mock.Mock(_id='09876')
        self.mr.merge_allowed = mock.Mock(return_value=True)
        self.mr.discussion_thread = mock.Mock()

    def test_can_merge_cache_key(self):
        key = self.mr.can_merge_cache_key()
        assert_equal(key, '12345-09876')

    def test_get_can_merge_cache(self):
        key = self.mr.can_merge_cache_key()
        assert_equal(self.mr.get_can_merge_cache(), None)
        self.mr.can_merge_cache[key] = True
        assert_equal(self.mr.get_can_merge_cache(), True)

        self.mr.can_merge_cache_key = lambda: '123-123'
        self.mr.can_merge_cache['123-123'] = False
        assert_equal(self.mr.get_can_merge_cache(), False)

    def test_set_can_merge_cache(self):
        key = self.mr.can_merge_cache_key()
        assert_equal(self.mr.can_merge_cache, {})
        self.mr.set_can_merge_cache(True)
        assert_equal(self.mr.can_merge_cache, {key: True})

        self.mr.can_merge_cache_key = lambda: '123-123'
        self.mr.set_can_merge_cache(False)
        assert_equal(self.mr.can_merge_cache, {key: True, '123-123': False})

    def test_can_merge_merged(self):
        self.mr.status = 'merged'
        assert_equal(self.mr.can_merge(), True)

    @mock.patch('allura.tasks.repo_tasks.can_merge', autospec=True)
    def test_can_merge_cached(self, can_merge_task):
        self.mr.set_can_merge_cache(False)
        assert_equal(self.mr.can_merge(), False)
        self.mr.set_can_merge_cache(True)
        assert_equal(self.mr.can_merge(), True)
        assert_equal(can_merge_task.post.call_count, 0)

    @mock.patch('allura.tasks.repo_tasks.can_merge', autospec=True)
    def test_can_merge_not_cached(self, can_merge_task):
        assert_equal(self.mr.can_merge(), None)
        can_merge_task.post.assert_called_once_with(self.mr._id)

    @mock.patch('allura.tasks.repo_tasks.can_merge', autospec=True)
    def test_can_merge_disabled(self, can_merge_task):
        self.mr.merge_allowed.return_value = False
        assert_equal(self.mr.can_merge(), None)
        assert_equal(can_merge_task.post.call_count, 0)

    @mock.patch('allura.tasks.repo_tasks.merge', autospec=True)
    def test_merge(self, merge_task):
        self.mr.merge_task_status = lambda: None
        self.mr.merge()
        merge_task.post.assert_called_once_with(self.mr._id)

        merge_task.reset_mock()
        self.mr.merge_task_status = lambda: 'ready'
        self.mr.merge()
        assert_equal(merge_task.post.called, False)

    def test_merge_task_status(self):
        from allura.tasks import repo_tasks
        assert_equal(self.mr.merge_task_status(), None)
        repo_tasks.merge.post(self.mr._id)
        assert_equal(self.mr.merge_task_status(), 'ready')
        M.MonQTask.run_ready()
        assert_equal(self.mr.merge_task_status(), 'complete')

    def test_can_merge_task_status(self):
        from allura.tasks import repo_tasks
        assert_equal(self.mr.can_merge_task_status(), None)
        repo_tasks.can_merge.post(self.mr._id)
        assert_equal(self.mr.can_merge_task_status(), 'ready')
        with mock.patch('allura.model.repository.MergeRequest.set_can_merge_cache'):
            M.MonQTask.run_ready()
        assert_equal(self.mr.can_merge_task_status(), 'complete')
