| # |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # |
| import unittest, os, weakref, setup_path, utils |
| |
| from svn import core, client, wc |
| |
| try: |
| # Python >=3.0 |
| from urllib.parse import urljoin |
| except ImportError: |
| # Python <3.0 |
| from urlparse import urljoin |
| |
| class SubversionClientTestCase(unittest.TestCase): |
| """Test cases for the basic SWIG Subversion client layer""" |
| |
| def assert_all_instances_of(self, iterable, instancetype): |
| """Asserts that all object from iterable are an instance of instancetype.""" |
| |
| self.assertTrue(not [x for x in iterable if not isinstance(x, instancetype)]) |
| |
| def log_message_func(self, items, pool): |
| """ Simple log message provider for unit tests. """ |
| self.log_message_func_calls += 1 |
| return b"Test log message" |
| |
| def make_log_message_func(self, message): |
| def generic_log_message_func(items, pool): |
| self.log_message_func_calls += 1 |
| return message |
| return generic_log_message_func |
| |
| def log_receiver(self, changed_paths, revision, author, date, message, pool): |
| """ Function to receive log messages retrieved by client.log3(). """ |
| self.log_message = message |
| self.change_author = author |
| self.changed_paths = changed_paths |
| |
| def log_entry_receiver(self, log_entry, pool): |
| """An implementation of svn_log_entry_receiver_t.""" |
| self.received_revisions.append(log_entry.revision) |
| |
| def setUp(self): |
| """Set up authentication and client context""" |
| self.client_ctx = client.svn_client_create_context() |
| self.assertEqual(self.client_ctx.log_msg_baton2, None) |
| self.assertEqual(self.client_ctx.log_msg_func2, None) |
| self.assertEqual(self.client_ctx.log_msg_baton3, None) |
| self.assertEqual(self.client_ctx.log_msg_func3, None) |
| self.client_ctx.log_msg_func3 = client.svn_swig_py_get_commit_log_func |
| self.client_ctx.log_msg_baton3 = self.log_message_func |
| self.log_message_func_calls = 0 |
| self.log_message = None |
| self.changed_paths = None |
| self.change_author = None |
| |
| providers = [ |
| client.svn_client_get_simple_provider(), |
| client.svn_client_get_username_provider(), |
| ] |
| |
| self.client_ctx.auth_baton = core.svn_auth_open(providers) |
| |
| self.temper = utils.Temper() |
| (_, self.repos_path, self.repos_uri) = self.temper.alloc_known_repo( |
| 'trac/versioncontrol/tests/svnrepos.dump', suffix='-client') |
| |
| def tearDown(self): |
| # We have to free client_ctx first, since it may be holding handles |
| # to WC DBs |
| del self.client_ctx |
| self.temper.cleanup() |
| |
| def testBatonPlay(self): |
| """Test playing with C batons""" |
| baton = lambda: 1 |
| weakref_baton = weakref.ref(baton) |
| self.client_ctx.log_msg_baton2 = baton |
| baton = None |
| self.assertEqual(self.client_ctx.log_msg_baton2(), 1) |
| self.assertEqual(weakref_baton()(), 1) |
| self.client_ctx.log_msg_baton2 = None |
| self.assertEqual(self.client_ctx.log_msg_baton2, None) |
| self.assertEqual(weakref_baton(), None) |
| |
| # External objects should retain their current parent pool |
| self.assertNotEqual(self.client_ctx._parent_pool, |
| self.client_ctx.auth_baton._parent_pool) |
| |
| # notify_func2 and notify_baton2 were generated by |
| # svn_client_create_context, so they should have |
| # the same pool as the context |
| self.assertEqual(self.client_ctx._parent_pool, |
| self.client_ctx.notify_func2._parent_pool) |
| self.assertEqual(self.client_ctx._parent_pool, |
| self.client_ctx.notify_baton2._parent_pool) |
| |
| def testMethodCalls(self): |
| """Test direct method calls to callbacks""" |
| |
| # Directly invoking the msg_baton should work |
| self.client_ctx.log_msg_baton3(None, None) |
| b = self.client_ctx.log_msg_baton3 |
| b(None, None) |
| self.assertEqual(self.log_message_func_calls, 2) |
| |
| # You can also invoke the log_msg_func3. It'd be |
| # nice if we could get log_msg_func3 function |
| # to invoke the baton function, but, in order to do that, |
| # we'd need to supply a value for the first parameter. |
| self.client_ctx.log_msg_func3(None, self.client_ctx.log_msg_baton3) |
| |
| def info_receiver(self, path, info, pool): |
| """Squirrel away the output from 'svn info' so that the unit tests |
| can get at them.""" |
| self.path = path |
| self.info = info |
| |
| def test_client_ctx_baton_lifetime(self): |
| pool = core.Pool() |
| temp_client_ctx = client.svn_client_create_context(pool) |
| |
| # We keep track of these objects in separate variables here |
| # because you can't get a PyObject back out of a PY_AS_VOID field |
| test_object1 = lambda *args: b"message 1" |
| test_object2 = lambda *args: b"message 2" |
| |
| # Verify that the refcount of a Python object is incremented when |
| # you insert it into a PY_AS_VOID field. |
| temp_client_ctx.log_msg_baton2 = test_object1 |
| test_object1 = weakref.ref(test_object1) |
| self.assertNotEqual(test_object1(), None) |
| |
| # Verify that the refcount of the previous Python object is decremented |
| # when a PY_AS_VOID field is replaced. |
| temp_client_ctx.log_msg_baton2 = test_object2 |
| self.assertEqual(test_object1(), None) |
| |
| # Verify that the reference count of the new Python object (which |
| # replaced test_object1) was incremented. |
| test_object2 = weakref.ref(test_object2) |
| self.assertNotEqual(test_object2(), None) |
| |
| # Verify that the reference count of test_object2 is decremented when |
| # test_client_ctx is destroyed. |
| temp_client_ctx = None |
| self.assertEqual(test_object2(), None) |
| |
| def test_checkout(self): |
| """Test svn_client_checkout2.""" |
| |
| rev = core.svn_opt_revision_t() |
| rev.kind = core.svn_opt_revision_head |
| |
| path = self.temper.alloc_empty_dir('-checkout') |
| |
| self.assertRaises(ValueError, client.checkout2, |
| self.repos_uri, path, None, None, True, True, |
| self.client_ctx) |
| |
| client.checkout2(self.repos_uri, path, rev, rev, True, True, |
| self.client_ctx) |
| |
| def test_info(self): |
| """Test svn_client_info on an empty repository""" |
| |
| # Run info |
| revt = core.svn_opt_revision_t() |
| revt.kind = core.svn_opt_revision_head |
| client.info(self.repos_uri, revt, revt, self.info_receiver, |
| False, self.client_ctx) |
| |
| # Check output from running info. This also serves to verify that |
| # the internal 'info' object is still valid |
| self.assertEqual(self.path, os.path.basename(self.repos_path)) |
| self.info.assert_valid() |
| self.assertEqual(self.info.URL, self.repos_uri) |
| self.assertEqual(self.info.repos_root_URL, self.repos_uri) |
| |
| def test_mkdir_url(self): |
| """Test svn_client_mkdir2 on a file:// URL""" |
| directory = urljoin(self.repos_uri+b"/", b"dir1") |
| |
| commit_info = client.mkdir2((directory,), self.client_ctx) |
| self.assertEqual(commit_info.revision, 13) |
| self.assertEqual(self.log_message_func_calls, 1) |
| |
| def test_mkdir_url_with_revprops(self): |
| """Test svn_client_mkdir3 on a file:// URL, with added revprops""" |
| directory = urljoin(self.repos_uri+b"/", b"some/deep/subdir") |
| |
| commit_info = client.mkdir3((directory,), 1, {b'customprop':b'value'}, |
| self.client_ctx) |
| self.assertEqual(commit_info.revision, 13) |
| self.assertEqual(self.log_message_func_calls, 1) |
| |
| def test_get_commit_log3_callback_accept_unicode(self): |
| """Test svn_client_get_commit_log3_t callback wrapper accept unicode as return value""" |
| directory = urljoin(self.repos_uri+b"/", b"dir1") |
| # override callback function which returns commit log as unicode |
| unicode_log_message_func = self.make_log_message_func(u"Test log message") |
| self.client_ctx.log_msg_baton3 = unicode_log_message_func |
| |
| commit_info = client.mkdir3((directory,), 1, {b'customprop':b'value'}, |
| self.client_ctx) |
| self.assertEqual(commit_info.revision, 13) |
| self.assertEqual(self.log_message_func_calls, 1) |
| |
| def test_get_commit_log3_callback_unicode_error(self): |
| """Test svn_client_get_commit_log3_t callback wrapper handles UnicodeEncodeError correctly""" |
| directory = urljoin(self.repos_uri+b"/", b"dir1") |
| # override callback function which returns commit log as unicode |
| # which contains surrogate escaped character |
| bogus_log_message_func = self.make_log_message_func(u"Test \udc6cog" |
| u" message") |
| self.client_ctx.log_msg_baton3 = bogus_log_message_func |
| |
| if not utils.IS_PY3 and utils.is_defaultencoding_utf8(): |
| # 'utf-8' codecs on Python 2 does not raise UnicodeEncodeError |
| # on surrogate code point U+dc00 - U+dcff, however it causes |
| # Subversion error on property validation of svn:log |
| with self.assertRaises(core.SubversionException): |
| commit_info = client.mkdir3((directory,), 1, {b'customprop':b'value'}, |
| self.client_ctx) |
| else: |
| with self.assertRaises(UnicodeEncodeError): |
| commit_info = client.mkdir3((directory,), 1, {b'customprop':b'value'}, |
| self.client_ctx) |
| |
| def test_log3_url(self): |
| """Test svn_client_log3 on a file:// URL""" |
| directory = urljoin(self.repos_uri+b"/", b"trunk/dir1") |
| |
| start = core.svn_opt_revision_t() |
| end = core.svn_opt_revision_t() |
| core.svn_opt_parse_revision(start, end, b"4:0") |
| client.log3((directory,), start, start, end, 1, True, False, |
| self.log_receiver, self.client_ctx) |
| self.assertEqual(self.change_author, b"john") |
| self.assertEqual(self.log_message, b"More directories.") |
| self.assertEqual(len(self.changed_paths), 3) |
| for dir in (b'/trunk/dir1', b'/trunk/dir2', b'/trunk/dir3'): |
| self.assertTrue(dir in self.changed_paths) |
| self.assertEqual(self.changed_paths[dir].action, b'A') |
| |
| def test_log5(self): |
| """Test svn_client_log5.""" |
| start = core.svn_opt_revision_t() |
| start.kind = core.svn_opt_revision_number |
| start.value.number = 0 |
| |
| end = core.svn_opt_revision_t() |
| end.kind = core.svn_opt_revision_number |
| end.value.number = 4 |
| |
| rev_range = core.svn_opt_revision_range_t() |
| rev_range.start = start |
| rev_range.end = end |
| |
| self.received_revisions = [] |
| |
| client.log5((self.repos_uri,), end, (rev_range,), 0, False, True, False, (), |
| self.log_entry_receiver, self.client_ctx) |
| |
| self.assertEqual(self.received_revisions, list(range(0, 5))) |
| |
| def test_log5_revprops(self): |
| """Test svn_client_log5 revprops (for typemap(in) apr_array_t *STRINGLIST)""" |
| directory = urljoin(self.repos_uri+b"/", b"trunk/dir1") |
| start = core.svn_opt_revision_t() |
| end = core.svn_opt_revision_t() |
| core.svn_opt_parse_revision(start, end, b"4:0") |
| rev_range = core.svn_opt_revision_range_t() |
| rev_range.start = start |
| rev_range.end = end |
| entry_pool = core.Pool() |
| |
| def log_entry_receiver_whole(log_entry, pool): |
| """An implementation of svn_log_entry_receiver_t, holds whole log entries.""" |
| self.received_log_entries.append(core.svn_log_entry_dup(log_entry, |
| entry_pool)) |
| |
| self.received_log_entries = [] |
| |
| # (Pass tuple of bytes and str(unicode) mixture as revprops argument) |
| client.log5((directory,), start, (rev_range,), 1, True, False, False, |
| (u'svn:author', b'svn:log'), |
| log_entry_receiver_whole, self.client_ctx) |
| self.assertEqual(len(self.received_log_entries), 1) |
| revprops = self.received_log_entries[0].revprops |
| self.assertEqual(revprops[b'svn:log'], b"More directories.") |
| self.assertEqual(revprops[b'svn:author'], b"john") |
| with self.assertRaises(KeyError): |
| commit_date = revprops['svn:date'] |
| if utils.IS_PY3 or not utils.is_defaultencoding_utf8(): |
| # 'utf-8' codecs on Python 2 does not raise UnicodeEncodeError |
| # on surrogate code point U+dc00 - U+dcff. So we need to skip |
| # below in such a case. |
| with self.assertRaises(UnicodeEncodeError): |
| client.log5((directory,), start, (rev_range,), 1, True, False, False, |
| (u'svn:\udc61uthor', b'svn:log'), |
| log_entry_receiver_whole, self.client_ctx) |
| |
| def test_uuid_from_url(self): |
| """Test svn_client_uuid_from_url on a file:// URL""" |
| self.assertTrue(isinstance( |
| client.uuid_from_url(self.repos_uri, self.client_ctx), |
| bytes)) |
| |
| def test_url_from_path(self): |
| """Test svn_client_url_from_path for a file:// URL""" |
| self.assertEqual(client.url_from_path(self.repos_uri), self.repos_uri) |
| |
| rev = core.svn_opt_revision_t() |
| rev.kind = core.svn_opt_revision_head |
| |
| path = self.temper.alloc_empty_dir('-url_from_path') |
| |
| client.checkout2(self.repos_uri, path, rev, rev, True, True, |
| self.client_ctx) |
| |
| self.assertEqual(client.url_from_path(path), self.repos_uri) |
| |
| def test_uuid_from_path(self): |
| """Test svn_client_uuid_from_path.""" |
| rev = core.svn_opt_revision_t() |
| rev.kind = core.svn_opt_revision_head |
| |
| path = self.temper.alloc_empty_dir('-uuid_from_path') |
| |
| client.checkout2(self.repos_uri, path, rev, rev, True, True, |
| self.client_ctx) |
| |
| wc_adm = wc.adm_open3(None, path, False, 0, None) |
| |
| self.assertEqual(client.uuid_from_path(path, wc_adm, self.client_ctx), |
| client.uuid_from_url(self.repos_uri, self.client_ctx)) |
| |
| self.assertTrue(isinstance(client.uuid_from_path(path, wc_adm, |
| self.client_ctx), bytes)) |
| |
| def test_open_ra_session(self): |
| """Test svn_client_open_ra_session().""" |
| client.open_ra_session(self.repos_uri, self.client_ctx) |
| |
| |
| def test_info_file(self): |
| """Test svn_client_info on working copy file and remote files.""" |
| |
| # This test requires a file /trunk/README.txt of size 8 bytes |
| # in the repository. |
| rev = core.svn_opt_revision_t() |
| rev.kind = core.svn_opt_revision_head |
| wc_path = self.temper.alloc_empty_dir('-info_file') |
| |
| client.checkout2(self.repos_uri, wc_path, rev, rev, True, True, |
| self.client_ctx) |
| adm_access = wc.adm_open3(None, wc_path, True, -1, None) |
| |
| try: |
| # Test 1: Run info -r BASE. We expect the size value to be filled in. |
| rev.kind = core.svn_opt_revision_base |
| readme_path = b'%s/trunk/README.txt' % wc_path |
| readme_url = b'%s/trunk/README.txt' % self.repos_uri |
| client.info(readme_path, rev, rev, self.info_receiver, |
| False, self.client_ctx) |
| |
| self.assertEqual(self.path, os.path.basename(readme_path)) |
| self.info.assert_valid() |
| self.assertEqual(self.info.working_size, client.SWIG_SVN_INFO_SIZE_UNKNOWN) |
| self.assertEqual(self.info.size, 8) |
| |
| # Test 2: Run info (revision unspecified). We expect the working_size value |
| # to be filled in. |
| rev.kind = core.svn_opt_revision_unspecified |
| client.info(readme_path, rev, rev, self.info_receiver, |
| False, self.client_ctx) |
| |
| self.assertEqual(self.path, readme_path) |
| self.info.assert_valid() |
| self.assertEqual(self.info.size, client.SWIG_SVN_INFO_SIZE_UNKNOWN) |
| # README.txt contains one EOL char, so on Windows it will be expanded from |
| # LF to CRLF hence the working_size will be 9 instead of 8. |
| if os.name == 'nt': |
| self.assertEqual(self.info.working_size, 9) |
| else: |
| self.assertEqual(self.info.working_size, 8) |
| |
| # Test 3: Run info on the repository URL of README.txt. We expect the size |
| # value to be filled in. |
| rev.kind = core.svn_opt_revision_head |
| client.info(readme_url, rev, rev, self.info_receiver, |
| False, self.client_ctx) |
| self.info.assert_valid() |
| self.assertEqual(self.info.working_size, client.SWIG_SVN_INFO_SIZE_UNKNOWN) |
| self.assertEqual(self.info.size, 8) |
| finally: |
| wc.adm_close(adm_access) |
| |
| def test_merge_peg3(self): |
| """Test svn_client_merge_peg3.""" |
| head = core.svn_opt_revision_t() |
| head.kind = core.svn_opt_revision_head |
| wc_path = self.temper.alloc_empty_dir('-merge_peg3') |
| |
| client.checkout3(self.repos_uri, wc_path, head, head, core.svn_depth_infinity, |
| True, False, self.client_ctx) |
| |
| # Let's try to backport a change from the v1x branch |
| trunk_path = core.svn_dirent_join(wc_path, b'trunk') |
| v1x_path = core.svn_dirent_join(wc_path, b'branches/v1x') |
| |
| start = core.svn_opt_revision_t() |
| start.kind = core.svn_opt_revision_number |
| start.value.number = 8 |
| |
| end = core.svn_opt_revision_t() |
| end.kind = core.svn_opt_revision_number |
| end.value.number = 9 |
| |
| rrange = core.svn_opt_revision_range_t() |
| rrange.start = start |
| rrange.end = end |
| |
| client.merge_peg3(v1x_path, (rrange,), end, trunk_path, |
| core.svn_depth_infinity, False, False, False, False, |
| None, self.client_ctx) |
| |
| # Did it take effect? |
| readme_path_native = core.svn_dirent_local_style( |
| core.svn_dirent_join(trunk_path, b'README.txt') |
| ) |
| |
| readme = open(readme_path_native, 'rb') |
| readme_text = readme.read() |
| readme.close() |
| |
| self.assertEqual(readme_text, |
| b'This is a test.' + os.linesep.encode('UTF-8')) |
| |
| def test_platform_providers(self): |
| providers = core.svn_auth_get_platform_specific_client_providers(None, None) |
| # Not much more we can test in this minimal environment. |
| self.assertTrue(isinstance(providers, list)) |
| self.assert_all_instances_of(providers, core.svn_auth_provider_object_t) |
| |
| def testGnomeKeyring(self): |
| if getattr(core, 'svn_auth_set_gnome_keyring_unlock_prompt_func', None) is None: |
| # gnome-keying not compiled in, do nothing |
| return |
| |
| # This tests setting the gnome-keyring unlock prompt function as an |
| # auth baton parameter. It doesn't actually call gnome-keyring |
| # stuff, since that would require having a gnome-keyring running. We |
| # just test if this doesn't error out, there's not even a return |
| # value to test. |
| def prompt_func(realm_string, pool): |
| return b"Foo" |
| |
| core.svn_auth_set_gnome_keyring_unlock_prompt_func(self.client_ctx.auth_baton, prompt_func) |
| |
| def proplist_receiver_trunk(self, path, props, iprops, pool): |
| self.assertEqual(props[b'svn:global-ignores'], b'*.q\n') |
| self.proplist_receiver_trunk_calls += 1 |
| |
| def proplist_receiver_dir1(self, path, props, iprops, pool): |
| self.assertEqual(iprops[self.proplist_receiver_dir1_key], |
| {b'svn:global-ignores':b'*.q\n'}) |
| self.proplist_receiver_dir1_calls += 1 |
| |
| def test_inherited_props(self): |
| """Test inherited props""" |
| |
| trunk_url = self.repos_uri + b'/trunk' |
| client.propset_remote(b'svn:global-ignores', b'*.q', trunk_url, |
| False, 12, {}, None, self.client_ctx) |
| |
| head = core.svn_opt_revision_t() |
| head.kind = core.svn_opt_revision_head |
| props, iprops, rev = client.propget5(b'svn:global-ignores', trunk_url, |
| head, head, core.svn_depth_infinity, |
| None, self.client_ctx) |
| self.assertEqual(props[trunk_url], b'*.q\n') |
| |
| dir1_url = trunk_url + b'/dir1' |
| props, iprops, rev = client.propget5(b'svn:global-ignores', dir1_url, |
| head, head, core.svn_depth_infinity, |
| None, self.client_ctx) |
| self.assertEqual(iprops[trunk_url], {b'svn:global-ignores':b'*.q\n'}) |
| |
| self.proplist_receiver_trunk_calls = 0 |
| client.proplist4(trunk_url, head, head, core.svn_depth_empty, None, True, |
| self.proplist_receiver_trunk, self.client_ctx) |
| self.assertEqual(self.proplist_receiver_trunk_calls, 1) |
| |
| self.proplist_receiver_dir1_calls = 0 |
| self.proplist_receiver_dir1_key = trunk_url |
| client.proplist4(dir1_url, head, head, core.svn_depth_empty, None, True, |
| self.proplist_receiver_dir1, self.client_ctx) |
| self.assertEqual(self.proplist_receiver_dir1_calls, 1) |
| |
| def test_propset_local(self): |
| """Test svn_client_propset_local. |
| (also, testing const svn_string_t * input)""" |
| |
| head = core.svn_opt_revision_t() |
| head.kind = core.svn_opt_revision_head |
| unspecified = core.svn_opt_revision_t() |
| unspecified.kind = core.svn_opt_revision_working |
| |
| path = self.temper.alloc_empty_dir('-propset_local') |
| |
| target_path = core.svn_dirent_join(path, b'trunk/README.txt') |
| target_prop = b'local_prop_test' |
| prop_val1 = b'foo' |
| |
| co_rev = client.checkout3(self.repos_uri, path, head, head, |
| core.svn_depth_infinity, True, True, |
| self.client_ctx) |
| |
| client.propset_local(target_prop, prop_val1, [target_path], |
| core.svn_depth_empty, False, None, self.client_ctx) |
| props, iprops, prop_rev = client.propget5(target_prop, target_path, |
| unspecified, unspecified, |
| core.svn_depth_empty, |
| None, self.client_ctx) |
| self.assertFalse(iprops) |
| self.assertEqual(prop_rev, co_rev) |
| self.assertEqual(props, { target_path : prop_val1 }) |
| |
| # Using str(unicode) to specify property value. |
| prop_val2 = b'bar' |
| client.propset_local(target_prop, prop_val2.decode('utf-8'), [target_path], |
| core.svn_depth_empty, False, None, self.client_ctx) |
| props, iprops, prop_rev = client.propget5(target_prop, target_path, |
| unspecified, unspecified, |
| core.svn_depth_empty, |
| None, self.client_ctx) |
| self.assertEqual(props, { target_path : prop_val2 }) |
| |
| # Using str(unicode) and check if it uses 'utf-8' codecs on Python 3 |
| # (or Python 2, only if its default encoding is 'utf-8') |
| if utils.IS_PY3 or utils.is_defaultencoding_utf8(): |
| # prop_val3 = '(checkmark)UNICODE' |
| prop_val3_str = (u'\u2705\U0001F1FA\U0001F1F3\U0001F1EE' |
| u'\U0001F1E8\U0001F1F4\U0001F1E9\U0001F1EA') |
| client.propset_local(target_prop, prop_val3_str, [target_path], |
| core.svn_depth_empty, False, None, self.client_ctx) |
| props, iprops, prop_rev = client.propget5(target_prop, target_path, |
| unspecified, unspecified, |
| core.svn_depth_empty, |
| None, self.client_ctx) |
| self.assertEqual(props, { target_path : prop_val3_str.encode('utf-8') }) |
| |
| def test_update4(self): |
| """Test update and the notify function callbacks""" |
| |
| rev = core.svn_opt_revision_t() |
| rev.kind = core.svn_opt_revision_number |
| rev.value.number = 0 |
| |
| path = self.temper.alloc_empty_dir('-update') |
| |
| self.assertRaises(ValueError, client.checkout2, |
| self.repos_uri, path, None, None, True, True, |
| self.client_ctx) |
| |
| client.checkout2(self.repos_uri, path, rev, rev, True, True, |
| self.client_ctx) |
| |
| def notify_func(path, action, kind, mime_type, content_state, prop_state, rev): |
| self.notified_paths.append(path) |
| |
| PATH_SEPARATOR = os.path.sep |
| if not isinstance(PATH_SEPARATOR, bytes): |
| PATH_SEPARATOR = PATH_SEPARATOR.encode('UTF-8') |
| self.client_ctx.notify_func = client.svn_swig_py_notify_func |
| self.client_ctx.notify_baton = notify_func |
| rev.value.number = 1 |
| self.notified_paths = [] |
| client.update4((path,), rev, core.svn_depth_unknown, True, False, False, |
| False, False, self.client_ctx) |
| expected_paths = [ |
| path, |
| os.path.join(path, b'branches'), |
| os.path.join(path, b'tags'), |
| os.path.join(path, b'trunk'), |
| path, |
| path |
| ] |
| # All normal subversion apis process paths in Subversion's canonical format, |
| # which isn't the platform specific format |
| expected_paths = [x.replace(PATH_SEPARATOR, b'/') for x in expected_paths] |
| self.notified_paths.sort() |
| expected_paths.sort() |
| |
| self.assertEqual(self.notified_paths, expected_paths) |
| |
| def notify_func2(notify, pool): |
| self.notified_paths.append(notify.path) |
| |
| self.client_ctx.notify_func2 = client.svn_swig_py_notify_func2 |
| self.client_ctx.notify_baton2 = notify_func2 |
| rev.value.number = 2 |
| self.notified_paths = [] |
| expected_paths = [ |
| path, |
| os.path.join(path, b'trunk', b'README.txt'), |
| os.path.join(path, b'trunk'), |
| path, |
| path |
| ] |
| expected_paths = [x.replace(PATH_SEPARATOR, b'/') for x in expected_paths] |
| client.update4((path,), rev, core.svn_depth_unknown, True, False, False, |
| False, False, self.client_ctx) |
| self.notified_paths.sort() |
| expected_paths.sort() |
| self.assertEqual(self.notified_paths, expected_paths) |
| |
| def test_conflict(self): |
| """Test conflict api.""" |
| |
| rev = core.svn_opt_revision_t() |
| rev.kind = core.svn_opt_revision_number |
| rev.value.number = 0 |
| |
| path = self.temper.alloc_empty_dir('-conflict') |
| |
| client.checkout2(self.repos_uri, path, rev, rev, True, True, |
| self.client_ctx) |
| |
| trunk_path = core.svn_dirent_join(path, b'trunk') |
| |
| # Create a conflicting path |
| os.mkdir(core.svn_dirent_local_style(trunk_path)) |
| |
| rev.value.number = 2 |
| |
| client.update4((path,), rev, core.svn_depth_unknown, True, False, False, |
| False, False, self.client_ctx) |
| |
| pool = core.Pool() |
| conflict = client.conflict_get(trunk_path, self.client_ctx, pool) |
| |
| self.assertTrue(isinstance(conflict, client.svn_client_conflict_t)) |
| |
| conflict_opts = client.conflict_tree_get_resolution_options(conflict, self.client_ctx) |
| |
| self.assertTrue(isinstance(conflict_opts, list)) |
| self.assert_all_instances_of(conflict_opts, client.svn_client_conflict_option_t) |
| |
| pool.clear() |
| |
| @unittest.skip("experimental API, not currently exposed") |
| def test_shelf(self): |
| """Test shelf api.""" |
| |
| rev = core.svn_opt_revision_t() |
| rev.kind = core.svn_opt_revision_number |
| rev.value.number = 2 |
| |
| path = self.temper.alloc_empty_dir('-shelf') |
| |
| |
| client.checkout2(self.repos_uri, path, rev, rev, True, True, |
| self.client_ctx) |
| |
| pool = core.Pool() |
| shelf = client._shelf_open_or_create(b"test1", path, self.client_ctx, pool) |
| |
| self.assertTrue(isinstance(shelf, client.svn_client__shelf_t)) |
| |
| new_subpath = core.svn_relpath_join(b'trunk', b'new-shelf-test.txt') |
| new_path = core.svn_dirent_join(path, new_subpath) |
| |
| with open(core.svn_dirent_local_style(new_path), "wb") as fp: |
| fp.write("A new text file\n".encode('utf8')) |
| |
| client.add5(new_path, core.svn_depth_unknown, False, False, False, True, self.client_ctx, pool) |
| |
| statused_paths = [] |
| def shelf_status(path, status, pool): |
| statused_paths.append(path) |
| |
| shelf_version = client._shelf_save_new_version3(shelf, (new_path, ), core.svn_depth_unknown, |
| None, shelf_status, None, pool) |
| |
| self.assertTrue(isinstance(shelf_version, client.svn_client__shelf_version_t)) |
| |
| all_versions = client._shelf_get_all_versions(shelf, pool, pool) |
| |
| self.assertEqual(1, len(all_versions)) |
| self.assertTrue(isinstance(all_versions[0], client.svn_client__shelf_version_t)) |
| self.assertEqual(shelf_version.version_number, all_versions[0].version_number) |
| self.assertIn(new_subpath, statused_paths) |
| |
| client._shelf_close(shelf, pool) |
| |
| pool.clear() |
| |
| |
| def suite(): |
| return unittest.defaultTestLoader.loadTestsFromTestCase( |
| SubversionClientTestCase) |
| |
| if __name__ == '__main__': |
| runner = unittest.TextTestRunner() |
| runner.run(suite()) |
| |