blob: 258d9d3bf5982fe8db925e8b3cfceb03b33debc5 [file] [log] [blame]
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
import unittest, os, weakref, setup_path, utils
from svn import core, client, wc
try:
# Python >=3.0
from urllib.parse import urljoin
except ImportError:
# Python <3.0
from urlparse import urljoin
class SubversionClientTestCase(unittest.TestCase):
"""Test cases for the basic SWIG Subversion client layer"""
def assert_all_instances_of(self, iterable, instancetype):
"""Asserts that all object from iterable are an instance of instancetype."""
self.assertTrue(not [x for x in iterable if not isinstance(x, instancetype)])
def log_message_func(self, items, pool):
""" Simple log message provider for unit tests. """
self.log_message_func_calls += 1
return b"Test log message"
def make_log_message_func(self, message):
def generic_log_message_func(items, pool):
self.log_message_func_calls += 1
return message
return generic_log_message_func
def log_receiver(self, changed_paths, revision, author, date, message, pool):
""" Function to receive log messages retrieved by client.log3(). """
self.log_message = message
self.change_author = author
self.changed_paths = changed_paths
def log_entry_receiver(self, log_entry, pool):
"""An implementation of svn_log_entry_receiver_t."""
self.received_revisions.append(log_entry.revision)
def setUp(self):
"""Set up authentication and client context"""
self.client_ctx = client.svn_client_create_context()
self.assertEqual(self.client_ctx.log_msg_baton2, None)
self.assertEqual(self.client_ctx.log_msg_func2, None)
self.assertEqual(self.client_ctx.log_msg_baton3, None)
self.assertEqual(self.client_ctx.log_msg_func3, None)
self.client_ctx.log_msg_func3 = client.svn_swig_py_get_commit_log_func
self.client_ctx.log_msg_baton3 = self.log_message_func
self.log_message_func_calls = 0
self.log_message = None
self.changed_paths = None
self.change_author = None
providers = [
client.svn_client_get_simple_provider(),
client.svn_client_get_username_provider(),
]
self.client_ctx.auth_baton = core.svn_auth_open(providers)
self.temper = utils.Temper()
(_, self.repos_path, self.repos_uri) = self.temper.alloc_known_repo(
'trac/versioncontrol/tests/svnrepos.dump', suffix='-client')
def tearDown(self):
# We have to free client_ctx first, since it may be holding handles
# to WC DBs
del self.client_ctx
self.temper.cleanup()
def testBatonPlay(self):
"""Test playing with C batons"""
baton = lambda: 1
weakref_baton = weakref.ref(baton)
self.client_ctx.log_msg_baton2 = baton
baton = None
self.assertEqual(self.client_ctx.log_msg_baton2(), 1)
self.assertEqual(weakref_baton()(), 1)
self.client_ctx.log_msg_baton2 = None
self.assertEqual(self.client_ctx.log_msg_baton2, None)
self.assertEqual(weakref_baton(), None)
# External objects should retain their current parent pool
self.assertNotEqual(self.client_ctx._parent_pool,
self.client_ctx.auth_baton._parent_pool)
# notify_func2 and notify_baton2 were generated by
# svn_client_create_context, so they should have
# the same pool as the context
self.assertEqual(self.client_ctx._parent_pool,
self.client_ctx.notify_func2._parent_pool)
self.assertEqual(self.client_ctx._parent_pool,
self.client_ctx.notify_baton2._parent_pool)
def testMethodCalls(self):
"""Test direct method calls to callbacks"""
# Directly invoking the msg_baton should work
self.client_ctx.log_msg_baton3(None, None)
b = self.client_ctx.log_msg_baton3
b(None, None)
self.assertEqual(self.log_message_func_calls, 2)
# You can also invoke the log_msg_func3. It'd be
# nice if we could get log_msg_func3 function
# to invoke the baton function, but, in order to do that,
# we'd need to supply a value for the first parameter.
self.client_ctx.log_msg_func3(None, self.client_ctx.log_msg_baton3)
def info_receiver(self, path, info, pool):
"""Squirrel away the output from 'svn info' so that the unit tests
can get at them."""
self.path = path
self.info = info
def test_client_ctx_baton_lifetime(self):
pool = core.Pool()
temp_client_ctx = client.svn_client_create_context(pool)
# We keep track of these objects in separate variables here
# because you can't get a PyObject back out of a PY_AS_VOID field
test_object1 = lambda *args: b"message 1"
test_object2 = lambda *args: b"message 2"
# Verify that the refcount of a Python object is incremented when
# you insert it into a PY_AS_VOID field.
temp_client_ctx.log_msg_baton2 = test_object1
test_object1 = weakref.ref(test_object1)
self.assertNotEqual(test_object1(), None)
# Verify that the refcount of the previous Python object is decremented
# when a PY_AS_VOID field is replaced.
temp_client_ctx.log_msg_baton2 = test_object2
self.assertEqual(test_object1(), None)
# Verify that the reference count of the new Python object (which
# replaced test_object1) was incremented.
test_object2 = weakref.ref(test_object2)
self.assertNotEqual(test_object2(), None)
# Verify that the reference count of test_object2 is decremented when
# test_client_ctx is destroyed.
temp_client_ctx = None
self.assertEqual(test_object2(), None)
def test_checkout(self):
"""Test svn_client_checkout2."""
rev = core.svn_opt_revision_t()
rev.kind = core.svn_opt_revision_head
path = self.temper.alloc_empty_dir('-checkout')
self.assertRaises(ValueError, client.checkout2,
self.repos_uri, path, None, None, True, True,
self.client_ctx)
client.checkout2(self.repos_uri, path, rev, rev, True, True,
self.client_ctx)
def test_info(self):
"""Test svn_client_info on an empty repository"""
# Run info
revt = core.svn_opt_revision_t()
revt.kind = core.svn_opt_revision_head
client.info(self.repos_uri, revt, revt, self.info_receiver,
False, self.client_ctx)
# Check output from running info. This also serves to verify that
# the internal 'info' object is still valid
self.assertEqual(self.path, os.path.basename(self.repos_path))
self.info.assert_valid()
self.assertEqual(self.info.URL, self.repos_uri)
self.assertEqual(self.info.repos_root_URL, self.repos_uri)
def test_mkdir_url(self):
"""Test svn_client_mkdir2 on a file:// URL"""
directory = urljoin(self.repos_uri+b"/", b"dir1")
commit_info = client.mkdir2((directory,), self.client_ctx)
self.assertEqual(commit_info.revision, 13)
self.assertEqual(self.log_message_func_calls, 1)
def test_mkdir_url_with_revprops(self):
"""Test svn_client_mkdir3 on a file:// URL, with added revprops"""
directory = urljoin(self.repos_uri+b"/", b"some/deep/subdir")
commit_info = client.mkdir3((directory,), 1, {b'customprop':b'value'},
self.client_ctx)
self.assertEqual(commit_info.revision, 13)
self.assertEqual(self.log_message_func_calls, 1)
def test_get_commit_log3_callback_accept_unicode(self):
"""Test svn_client_get_commit_log3_t callback wrapper accept unicode as return value"""
directory = urljoin(self.repos_uri+b"/", b"dir1")
# override callback function which returns commit log as unicode
unicode_log_message_func = self.make_log_message_func(u"Test log message")
self.client_ctx.log_msg_baton3 = unicode_log_message_func
commit_info = client.mkdir3((directory,), 1, {b'customprop':b'value'},
self.client_ctx)
self.assertEqual(commit_info.revision, 13)
self.assertEqual(self.log_message_func_calls, 1)
def test_get_commit_log3_callback_unicode_error(self):
"""Test svn_client_get_commit_log3_t callback wrapper handles UnicodeEncodeError correctly"""
directory = urljoin(self.repos_uri+b"/", b"dir1")
# override callback function which returns commit log as unicode
# which contains surrogate escaped character
bogus_log_message_func = self.make_log_message_func(u"Test \udc6cog"
u" message")
self.client_ctx.log_msg_baton3 = bogus_log_message_func
if not utils.IS_PY3 and utils.is_defaultencoding_utf8():
# 'utf-8' codecs on Python 2 does not raise UnicodeEncodeError
# on surrogate code point U+dc00 - U+dcff, however it causes
# Subversion error on property validation of svn:log
with self.assertRaises(core.SubversionException):
commit_info = client.mkdir3((directory,), 1, {b'customprop':b'value'},
self.client_ctx)
else:
with self.assertRaises(UnicodeEncodeError):
commit_info = client.mkdir3((directory,), 1, {b'customprop':b'value'},
self.client_ctx)
def test_log3_url(self):
"""Test svn_client_log3 on a file:// URL"""
directory = urljoin(self.repos_uri+b"/", b"trunk/dir1")
start = core.svn_opt_revision_t()
end = core.svn_opt_revision_t()
core.svn_opt_parse_revision(start, end, b"4:0")
client.log3((directory,), start, start, end, 1, True, False,
self.log_receiver, self.client_ctx)
self.assertEqual(self.change_author, b"john")
self.assertEqual(self.log_message, b"More directories.")
self.assertEqual(len(self.changed_paths), 3)
for dir in (b'/trunk/dir1', b'/trunk/dir2', b'/trunk/dir3'):
self.assertTrue(dir in self.changed_paths)
self.assertEqual(self.changed_paths[dir].action, b'A')
def test_log5(self):
"""Test svn_client_log5."""
start = core.svn_opt_revision_t()
start.kind = core.svn_opt_revision_number
start.value.number = 0
end = core.svn_opt_revision_t()
end.kind = core.svn_opt_revision_number
end.value.number = 4
rev_range = core.svn_opt_revision_range_t()
rev_range.start = start
rev_range.end = end
self.received_revisions = []
client.log5((self.repos_uri,), end, (rev_range,), 0, False, True, False, (),
self.log_entry_receiver, self.client_ctx)
self.assertEqual(self.received_revisions, list(range(0, 5)))
def test_log5_revprops(self):
"""Test svn_client_log5 revprops (for typemap(in) apr_array_t *STRINGLIST)"""
directory = urljoin(self.repos_uri+b"/", b"trunk/dir1")
start = core.svn_opt_revision_t()
end = core.svn_opt_revision_t()
core.svn_opt_parse_revision(start, end, b"4:0")
rev_range = core.svn_opt_revision_range_t()
rev_range.start = start
rev_range.end = end
entry_pool = core.Pool()
def log_entry_receiver_whole(log_entry, pool):
"""An implementation of svn_log_entry_receiver_t, holds whole log entries."""
self.received_log_entries.append(core.svn_log_entry_dup(log_entry,
entry_pool))
self.received_log_entries = []
# (Pass tuple of bytes and str(unicode) mixture as revprops argument)
client.log5((directory,), start, (rev_range,), 1, True, False, False,
(u'svn:author', b'svn:log'),
log_entry_receiver_whole, self.client_ctx)
self.assertEqual(len(self.received_log_entries), 1)
revprops = self.received_log_entries[0].revprops
self.assertEqual(revprops[b'svn:log'], b"More directories.")
self.assertEqual(revprops[b'svn:author'], b"john")
with self.assertRaises(KeyError):
commit_date = revprops['svn:date']
if utils.IS_PY3 or not utils.is_defaultencoding_utf8():
# 'utf-8' codecs on Python 2 does not raise UnicodeEncodeError
# on surrogate code point U+dc00 - U+dcff. So we need to skip
# below in such a case.
with self.assertRaises(UnicodeEncodeError):
client.log5((directory,), start, (rev_range,), 1, True, False, False,
(u'svn:\udc61uthor', b'svn:log'),
log_entry_receiver_whole, self.client_ctx)
def test_uuid_from_url(self):
"""Test svn_client_uuid_from_url on a file:// URL"""
self.assertTrue(isinstance(
client.uuid_from_url(self.repos_uri, self.client_ctx),
bytes))
def test_url_from_path(self):
"""Test svn_client_url_from_path for a file:// URL"""
self.assertEqual(client.url_from_path(self.repos_uri), self.repos_uri)
rev = core.svn_opt_revision_t()
rev.kind = core.svn_opt_revision_head
path = self.temper.alloc_empty_dir('-url_from_path')
client.checkout2(self.repos_uri, path, rev, rev, True, True,
self.client_ctx)
self.assertEqual(client.url_from_path(path), self.repos_uri)
def test_uuid_from_path(self):
"""Test svn_client_uuid_from_path."""
rev = core.svn_opt_revision_t()
rev.kind = core.svn_opt_revision_head
path = self.temper.alloc_empty_dir('-uuid_from_path')
client.checkout2(self.repos_uri, path, rev, rev, True, True,
self.client_ctx)
wc_adm = wc.adm_open3(None, path, False, 0, None)
self.assertEqual(client.uuid_from_path(path, wc_adm, self.client_ctx),
client.uuid_from_url(self.repos_uri, self.client_ctx))
self.assertTrue(isinstance(client.uuid_from_path(path, wc_adm,
self.client_ctx), bytes))
def test_open_ra_session(self):
"""Test svn_client_open_ra_session()."""
client.open_ra_session(self.repos_uri, self.client_ctx)
def test_info_file(self):
"""Test svn_client_info on working copy file and remote files."""
# This test requires a file /trunk/README.txt of size 8 bytes
# in the repository.
rev = core.svn_opt_revision_t()
rev.kind = core.svn_opt_revision_head
wc_path = self.temper.alloc_empty_dir('-info_file')
client.checkout2(self.repos_uri, wc_path, rev, rev, True, True,
self.client_ctx)
adm_access = wc.adm_open3(None, wc_path, True, -1, None)
try:
# Test 1: Run info -r BASE. We expect the size value to be filled in.
rev.kind = core.svn_opt_revision_base
readme_path = b'%s/trunk/README.txt' % wc_path
readme_url = b'%s/trunk/README.txt' % self.repos_uri
client.info(readme_path, rev, rev, self.info_receiver,
False, self.client_ctx)
self.assertEqual(self.path, os.path.basename(readme_path))
self.info.assert_valid()
self.assertEqual(self.info.working_size, client.SWIG_SVN_INFO_SIZE_UNKNOWN)
self.assertEqual(self.info.size, 8)
# Test 2: Run info (revision unspecified). We expect the working_size value
# to be filled in.
rev.kind = core.svn_opt_revision_unspecified
client.info(readme_path, rev, rev, self.info_receiver,
False, self.client_ctx)
self.assertEqual(self.path, readme_path)
self.info.assert_valid()
self.assertEqual(self.info.size, client.SWIG_SVN_INFO_SIZE_UNKNOWN)
# README.txt contains one EOL char, so on Windows it will be expanded from
# LF to CRLF hence the working_size will be 9 instead of 8.
if os.name == 'nt':
self.assertEqual(self.info.working_size, 9)
else:
self.assertEqual(self.info.working_size, 8)
# Test 3: Run info on the repository URL of README.txt. We expect the size
# value to be filled in.
rev.kind = core.svn_opt_revision_head
client.info(readme_url, rev, rev, self.info_receiver,
False, self.client_ctx)
self.info.assert_valid()
self.assertEqual(self.info.working_size, client.SWIG_SVN_INFO_SIZE_UNKNOWN)
self.assertEqual(self.info.size, 8)
finally:
wc.adm_close(adm_access)
def test_merge_peg3(self):
"""Test svn_client_merge_peg3."""
head = core.svn_opt_revision_t()
head.kind = core.svn_opt_revision_head
wc_path = self.temper.alloc_empty_dir('-merge_peg3')
client.checkout3(self.repos_uri, wc_path, head, head, core.svn_depth_infinity,
True, False, self.client_ctx)
# Let's try to backport a change from the v1x branch
trunk_path = core.svn_dirent_join(wc_path, b'trunk')
v1x_path = core.svn_dirent_join(wc_path, b'branches/v1x')
start = core.svn_opt_revision_t()
start.kind = core.svn_opt_revision_number
start.value.number = 8
end = core.svn_opt_revision_t()
end.kind = core.svn_opt_revision_number
end.value.number = 9
rrange = core.svn_opt_revision_range_t()
rrange.start = start
rrange.end = end
client.merge_peg3(v1x_path, (rrange,), end, trunk_path,
core.svn_depth_infinity, False, False, False, False,
None, self.client_ctx)
# Did it take effect?
readme_path_native = core.svn_dirent_local_style(
core.svn_dirent_join(trunk_path, b'README.txt')
)
readme = open(readme_path_native, 'rb')
readme_text = readme.read()
readme.close()
self.assertEqual(readme_text,
b'This is a test.' + os.linesep.encode('UTF-8'))
def test_platform_providers(self):
providers = core.svn_auth_get_platform_specific_client_providers(None, None)
# Not much more we can test in this minimal environment.
self.assertTrue(isinstance(providers, list))
self.assert_all_instances_of(providers, core.svn_auth_provider_object_t)
def testGnomeKeyring(self):
if getattr(core, 'svn_auth_set_gnome_keyring_unlock_prompt_func', None) is None:
# gnome-keying not compiled in, do nothing
return
# This tests setting the gnome-keyring unlock prompt function as an
# auth baton parameter. It doesn't actually call gnome-keyring
# stuff, since that would require having a gnome-keyring running. We
# just test if this doesn't error out, there's not even a return
# value to test.
def prompt_func(realm_string, pool):
return b"Foo"
core.svn_auth_set_gnome_keyring_unlock_prompt_func(self.client_ctx.auth_baton, prompt_func)
def proplist_receiver_trunk(self, path, props, iprops, pool):
self.assertEqual(props[b'svn:global-ignores'], b'*.q\n')
self.proplist_receiver_trunk_calls += 1
def proplist_receiver_dir1(self, path, props, iprops, pool):
self.assertEqual(iprops[self.proplist_receiver_dir1_key],
{b'svn:global-ignores':b'*.q\n'})
self.proplist_receiver_dir1_calls += 1
def test_inherited_props(self):
"""Test inherited props"""
trunk_url = self.repos_uri + b'/trunk'
client.propset_remote(b'svn:global-ignores', b'*.q', trunk_url,
False, 12, {}, None, self.client_ctx)
head = core.svn_opt_revision_t()
head.kind = core.svn_opt_revision_head
props, iprops, rev = client.propget5(b'svn:global-ignores', trunk_url,
head, head, core.svn_depth_infinity,
None, self.client_ctx)
self.assertEqual(props[trunk_url], b'*.q\n')
dir1_url = trunk_url + b'/dir1'
props, iprops, rev = client.propget5(b'svn:global-ignores', dir1_url,
head, head, core.svn_depth_infinity,
None, self.client_ctx)
self.assertEqual(iprops[trunk_url], {b'svn:global-ignores':b'*.q\n'})
self.proplist_receiver_trunk_calls = 0
client.proplist4(trunk_url, head, head, core.svn_depth_empty, None, True,
self.proplist_receiver_trunk, self.client_ctx)
self.assertEqual(self.proplist_receiver_trunk_calls, 1)
self.proplist_receiver_dir1_calls = 0
self.proplist_receiver_dir1_key = trunk_url
client.proplist4(dir1_url, head, head, core.svn_depth_empty, None, True,
self.proplist_receiver_dir1, self.client_ctx)
self.assertEqual(self.proplist_receiver_dir1_calls, 1)
def test_propset_local(self):
"""Test svn_client_propset_local.
(also, testing const svn_string_t * input)"""
head = core.svn_opt_revision_t()
head.kind = core.svn_opt_revision_head
unspecified = core.svn_opt_revision_t()
unspecified.kind = core.svn_opt_revision_working
path = self.temper.alloc_empty_dir('-propset_local')
target_path = core.svn_dirent_join(path, b'trunk/README.txt')
target_prop = b'local_prop_test'
prop_val1 = b'foo'
co_rev = client.checkout3(self.repos_uri, path, head, head,
core.svn_depth_infinity, True, True,
self.client_ctx)
client.propset_local(target_prop, prop_val1, [target_path],
core.svn_depth_empty, False, None, self.client_ctx)
props, iprops, prop_rev = client.propget5(target_prop, target_path,
unspecified, unspecified,
core.svn_depth_empty,
None, self.client_ctx)
self.assertFalse(iprops)
self.assertEqual(prop_rev, co_rev)
self.assertEqual(props, { target_path : prop_val1 })
# Using str(unicode) to specify property value.
prop_val2 = b'bar'
client.propset_local(target_prop, prop_val2.decode('utf-8'), [target_path],
core.svn_depth_empty, False, None, self.client_ctx)
props, iprops, prop_rev = client.propget5(target_prop, target_path,
unspecified, unspecified,
core.svn_depth_empty,
None, self.client_ctx)
self.assertEqual(props, { target_path : prop_val2 })
# Using str(unicode) and check if it uses 'utf-8' codecs on Python 3
# (or Python 2, only if its default encoding is 'utf-8')
if utils.IS_PY3 or utils.is_defaultencoding_utf8():
# prop_val3 = '(checkmark)UNICODE'
prop_val3_str = (u'\u2705\U0001F1FA\U0001F1F3\U0001F1EE'
u'\U0001F1E8\U0001F1F4\U0001F1E9\U0001F1EA')
client.propset_local(target_prop, prop_val3_str, [target_path],
core.svn_depth_empty, False, None, self.client_ctx)
props, iprops, prop_rev = client.propget5(target_prop, target_path,
unspecified, unspecified,
core.svn_depth_empty,
None, self.client_ctx)
self.assertEqual(props, { target_path : prop_val3_str.encode('utf-8') })
def test_update4(self):
"""Test update and the notify function callbacks"""
rev = core.svn_opt_revision_t()
rev.kind = core.svn_opt_revision_number
rev.value.number = 0
path = self.temper.alloc_empty_dir('-update')
self.assertRaises(ValueError, client.checkout2,
self.repos_uri, path, None, None, True, True,
self.client_ctx)
client.checkout2(self.repos_uri, path, rev, rev, True, True,
self.client_ctx)
def notify_func(path, action, kind, mime_type, content_state, prop_state, rev):
self.notified_paths.append(path)
PATH_SEPARATOR = os.path.sep
if not isinstance(PATH_SEPARATOR, bytes):
PATH_SEPARATOR = PATH_SEPARATOR.encode('UTF-8')
self.client_ctx.notify_func = client.svn_swig_py_notify_func
self.client_ctx.notify_baton = notify_func
rev.value.number = 1
self.notified_paths = []
client.update4((path,), rev, core.svn_depth_unknown, True, False, False,
False, False, self.client_ctx)
expected_paths = [
path,
os.path.join(path, b'branches'),
os.path.join(path, b'tags'),
os.path.join(path, b'trunk'),
path,
path
]
# All normal subversion apis process paths in Subversion's canonical format,
# which isn't the platform specific format
expected_paths = [x.replace(PATH_SEPARATOR, b'/') for x in expected_paths]
self.notified_paths.sort()
expected_paths.sort()
self.assertEqual(self.notified_paths, expected_paths)
def notify_func2(notify, pool):
self.notified_paths.append(notify.path)
self.client_ctx.notify_func2 = client.svn_swig_py_notify_func2
self.client_ctx.notify_baton2 = notify_func2
rev.value.number = 2
self.notified_paths = []
expected_paths = [
path,
os.path.join(path, b'trunk', b'README.txt'),
os.path.join(path, b'trunk'),
path,
path
]
expected_paths = [x.replace(PATH_SEPARATOR, b'/') for x in expected_paths]
client.update4((path,), rev, core.svn_depth_unknown, True, False, False,
False, False, self.client_ctx)
self.notified_paths.sort()
expected_paths.sort()
self.assertEqual(self.notified_paths, expected_paths)
def test_conflict(self):
"""Test conflict api."""
rev = core.svn_opt_revision_t()
rev.kind = core.svn_opt_revision_number
rev.value.number = 0
path = self.temper.alloc_empty_dir('-conflict')
client.checkout2(self.repos_uri, path, rev, rev, True, True,
self.client_ctx)
trunk_path = core.svn_dirent_join(path, b'trunk')
# Create a conflicting path
os.mkdir(core.svn_dirent_local_style(trunk_path))
rev.value.number = 2
client.update4((path,), rev, core.svn_depth_unknown, True, False, False,
False, False, self.client_ctx)
pool = core.Pool()
conflict = client.conflict_get(trunk_path, self.client_ctx, pool)
self.assertTrue(isinstance(conflict, client.svn_client_conflict_t))
conflict_opts = client.conflict_tree_get_resolution_options(conflict, self.client_ctx)
self.assertTrue(isinstance(conflict_opts, list))
self.assert_all_instances_of(conflict_opts, client.svn_client_conflict_option_t)
pool.clear()
@unittest.skip("experimental API, not currently exposed")
def test_shelf(self):
"""Test shelf api."""
rev = core.svn_opt_revision_t()
rev.kind = core.svn_opt_revision_number
rev.value.number = 2
path = self.temper.alloc_empty_dir('-shelf')
client.checkout2(self.repos_uri, path, rev, rev, True, True,
self.client_ctx)
pool = core.Pool()
shelf = client._shelf_open_or_create(b"test1", path, self.client_ctx, pool)
self.assertTrue(isinstance(shelf, client.svn_client__shelf_t))
new_subpath = core.svn_relpath_join(b'trunk', b'new-shelf-test.txt')
new_path = core.svn_dirent_join(path, new_subpath)
with open(core.svn_dirent_local_style(new_path), "wb") as fp:
fp.write("A new text file\n".encode('utf8'))
client.add5(new_path, core.svn_depth_unknown, False, False, False, True, self.client_ctx, pool)
statused_paths = []
def shelf_status(path, status, pool):
statused_paths.append(path)
shelf_version = client._shelf_save_new_version3(shelf, (new_path, ), core.svn_depth_unknown,
None, shelf_status, None, pool)
self.assertTrue(isinstance(shelf_version, client.svn_client__shelf_version_t))
all_versions = client._shelf_get_all_versions(shelf, pool, pool)
self.assertEqual(1, len(all_versions))
self.assertTrue(isinstance(all_versions[0], client.svn_client__shelf_version_t))
self.assertEqual(shelf_version.version_number, all_versions[0].version_number)
self.assertIn(new_subpath, statused_paths)
client._shelf_close(shelf, pool)
pool.clear()
def suite():
return unittest.defaultTestLoader.loadTestsFromTestCase(
SubversionClientTestCase)
if __name__ == '__main__':
runner = unittest.TextTestRunner()
runner.run(suite())