| # |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # |
| import unittest, setup_path, os, sys, weakref |
| from sys import version_info # For Python version check |
| from io import BytesIO |
| from svn import core, repos, fs, delta |
| from svn.core import SubversionException, Pool |
| import utils |
| |
| class ChangeReceiver(delta.Editor): |
| """A delta editor which saves textdeltas for later use""" |
| |
| def __init__(self, src_root, tgt_root): |
| self.src_root = src_root |
| self.tgt_root = tgt_root |
| self.textdeltas = [] |
| |
| def apply_textdelta(self, file_baton, base_checksum, pool=None): |
| def textdelta_handler(textdelta): |
| if textdelta is not None: |
| self.textdeltas.append(textdelta) |
| return textdelta_handler |
| |
| class DumpStreamParser(repos.ParseFns3): |
| def __init__(self, stream=None, pool=None): |
| repos.ParseFns3.__init__(self) |
| self.stream = stream |
| self.ops = [] |
| # for leak checking only. If the parse_fns3 object holds some proxy |
| # object allocated from 'pool' or the 'pool' itself, the 'pool' is not |
| # destroyed until the parse_fns3 object is removed. |
| self.pool = pool |
| def _close_dumpstream(self): |
| if self.stream: |
| self.stream.close() |
| self.stream = None |
| if self.pool: |
| self.pool = None |
| def magic_header_record(self, version, pool=None): |
| self.ops.append((b"magic-header", version)) |
| def uuid_record(self, uuid, pool=None): |
| self.ops.append((b"uuid", uuid)) |
| def new_revision_record(self, headers, pool=None): |
| rev = int(headers[repos.DUMPFILE_REVISION_NUMBER]) |
| self.ops.append((b"new-revision", rev)) |
| return rev |
| def close_revision(self, revision_baton): |
| self.ops.append((b"close-revision", revision_baton)) |
| def new_node_record(self, headers, revision_baton, pool=None): |
| node = headers[repos.DUMPFILE_NODE_PATH] |
| self.ops.append((b"new-node", revision_baton, node)) |
| return (revision_baton, node) |
| def close_node(self, node_baton): |
| self.ops.append((b"close-node", node_baton[0], node_baton[1])) |
| def set_revision_property(self, revision_baton, name, value): |
| self.ops.append((b"set-revision-prop", revision_baton, name, value)) |
| def set_node_property(self, node_baton, name, value): |
| self.ops.append((b"set-node-prop", node_baton[0], node_baton[1], name, value)) |
| def remove_node_props(self, node_baton): |
| self.ops.append((b"remove-node-props", node_baton[0], node_baton[1])) |
| def delete_node_property(self, node_baton, name): |
| self.ops.append((b"delete-node-prop", node_baton[0], node_baton[1], name)) |
| def apply_textdelta(self, node_baton): |
| self.ops.append((b"apply-textdelta", node_baton[0], node_baton[1])) |
| return None |
| def set_fulltext(self, node_baton): |
| self.ops.append((b"set-fulltext", node_baton[0], node_baton[1])) |
| return None |
| |
| class BatonCollector(repos.ChangeCollector): |
| """A ChangeCollector with collecting batons, too""" |
| def __init__(self, fs_ptr, root, pool=None, notify_cb=None): |
| repos.ChangeCollector.__init__(self, fs_ptr, root, pool, notify_cb) |
| self.batons = [] |
| self.close_called = False |
| self.abort_called = False |
| |
| def open_root(self, base_revision, dir_pool=None): |
| bt = repos.ChangeCollector.open_root(self, base_revision, dir_pool) |
| self.batons.append((b'dir baton', b'', bt, sys.getrefcount(bt))) |
| return bt |
| |
| def add_directory(self, path, parent_baton, |
| copyfrom_path, copyfrom_revision, dir_pool=None): |
| bt = repos.ChangeCollector.add_directory(self, path, parent_baton, |
| copyfrom_path, |
| copyfrom_revision, |
| dir_pool) |
| self.batons.append((b'dir baton', path, bt, sys.getrefcount(bt))) |
| return bt |
| |
| def open_directory(self, path, parent_baton, base_revision, |
| dir_pool=None): |
| bt = repos.ChangeCollector.open_directory(self, path, parent_baton, |
| base_revision, dir_pool) |
| self.batons.append((b'dir baton', path, bt, sys.getrefcount(bt))) |
| return bt |
| |
| def add_file(self, path, parent_baton, |
| copyfrom_path, copyfrom_revision, file_pool=None): |
| bt = repos.ChangeCollector.add_file(self, path, parent_baton, |
| copyfrom_path, copyfrom_revision, |
| file_pool) |
| self.batons.append((b'file baton', path, bt, sys.getrefcount(bt))) |
| return bt |
| |
| def open_file(self, path, parent_baton, base_revision, file_pool=None): |
| bt = repos.ChangeCollector.open_file(self, path, parent_baton, |
| base_revision, file_pool) |
| self.batons.append((b'file baton', path, bt, sys.getrefcount(bt))) |
| return bt |
| |
| def close_edit(self, pool=None): |
| self.close_called = True |
| return |
| |
| def abort_edit(self, pool=None): |
| self.abort_called = True |
| return |
| |
| class BatonCollectorErrorOnClose(BatonCollector): |
| """Same as BatonCollector, but raises an Exception when close the |
| file/dir specfied by error_path""" |
| def __init__(self, fs_ptr, root, pool=None, notify_cb=None, error_path=b''): |
| BatonCollector.__init__(self, fs_ptr, root, pool, notify_cb) |
| self.error_path = error_path |
| |
| def close_directory(self, dir_baton): |
| if dir_baton[0] == self.error_path: |
| raise SubversionException('A Dummy Exception!', core.SVN_ERR_BASE) |
| else: |
| BatonCollector.close_directory(self, dir_baton) |
| |
| def close_file(self, file_baton, text_checksum): |
| if file_baton[0] == self.error_path: |
| raise SubversionException('A Dummy Exception!', core.SVN_ERR_BASE) |
| else: |
| return BatonCollector.close_file(self, file_baton, text_checksum) |
| |
| |
| def _authz_callback(root, path, pool): |
| "A dummy authz callback which always returns success." |
| return 1 |
| |
| class SubversionRepositoryTestCase(unittest.TestCase): |
| """Test cases for the Subversion repository layer""" |
| |
| def setUp(self): |
| """Load a Subversion repository""" |
| self.temper = utils.Temper() |
| (self.repos, self.repos_path, _) = self.temper.alloc_known_repo( |
| 'trac/versioncontrol/tests/svnrepos.dump', suffix='-repository') |
| self.fs = repos.fs(self.repos) |
| self.rev = fs.youngest_rev(self.fs) |
| |
| def tearDown(self): |
| self.fs = None |
| self.repos = None |
| self.temper.cleanup() |
| |
| def test_cease_invocation(self): |
| """Test returning SVN_ERR_CEASE_INVOCATION from a callback""" |
| |
| revs = [] |
| def history_lookup(path, rev, pool): |
| revs.append(rev) |
| raise core.SubversionException(apr_err=core.SVN_ERR_CEASE_INVOCATION, |
| message="Hi from history_lookup") |
| |
| repos.history2(self.fs, b'/trunk/README2.txt', history_lookup, None, 0, |
| self.rev, True) |
| self.assertEqual(len(revs), 1) |
| |
| def test_create(self): |
| """Make sure that repos.create doesn't segfault when we set fs-type |
| using a config hash""" |
| fs_config = { b"fs-type": b"fsfs" } |
| for i in range(5): |
| path = self.temper.alloc_empty_dir(suffix='-repository-create%d' % i) |
| repos.create(path, b"", b"", None, fs_config) |
| |
| def test_dump_fs2(self): |
| """Test the dump_fs2 function""" |
| |
| self.callback_calls = 0 |
| |
| def is_cancelled(): |
| self.callback_calls += 1 |
| return None |
| |
| dumpstream = BytesIO() |
| feedbackstream = BytesIO() |
| repos.dump_fs2(self.repos, dumpstream, feedbackstream, 0, self.rev, 0, 0, |
| is_cancelled) |
| |
| # Check that we can dump stuff |
| dump = dumpstream.getvalue() |
| feedback = feedbackstream.getvalue() |
| expected_feedback = b"* Dumped revision " + str(self.rev).encode('utf-8') |
| self.assertEqual(dump.count(b"Node-path: trunk/README.txt"), 2) |
| self.assertEqual(feedback.count(expected_feedback), 1) |
| self.assertEqual(self.callback_calls, 13) |
| |
| # Check that the dump can be cancelled |
| self.assertRaises(SubversionException, repos.dump_fs2, |
| self.repos, dumpstream, feedbackstream, 0, self.rev, 0, 0, lambda: 1) |
| |
| dumpstream.close() |
| feedbackstream.close() |
| |
| # Check that the dump fails when the dumpstream is closed |
| self.assertRaises(ValueError, repos.dump_fs2, |
| self.repos, dumpstream, feedbackstream, 0, self.rev, 0, 0, None) |
| |
| dumpstream = BytesIO() |
| feedbackstream = BytesIO() |
| |
| # Check that we can grab the feedback stream, but not the dumpstream |
| repos.dump_fs2(self.repos, None, feedbackstream, 0, self.rev, 0, 0, None) |
| feedback = feedbackstream.getvalue() |
| self.assertEqual(feedback.count(expected_feedback), 1) |
| |
| # Check that we can grab the dumpstream, but not the feedbackstream |
| repos.dump_fs2(self.repos, dumpstream, None, 0, self.rev, 0, 0, None) |
| dump = dumpstream.getvalue() |
| self.assertEqual(dump.count(b"Node-path: trunk/README.txt"), 2) |
| |
| # Check that we can ignore both the dumpstream and the feedbackstream |
| repos.dump_fs2(self.repos, dumpstream, None, 0, self.rev, 0, 0, None) |
| self.assertEqual(feedback.count(expected_feedback), 1) |
| |
| # FIXME: The Python bindings don't check for 'NULL' values for |
| # svn_repos_t objects, so the following call segfaults |
| #repos.dump_fs2(None, None, None, 0, self.rev, 0, 0, None) |
| |
| def test_parse_fns3(self): |
| self.cancel_calls = 0 |
| def is_cancelled(): |
| self.cancel_calls += 1 |
| return None |
| pool = Pool() |
| subpool = Pool(pool) |
| dump_path = os.path.join(os.path.dirname(sys.argv[0]), |
| "trac/versioncontrol/tests/svnrepos.dump") |
| stream = open(dump_path, 'rb') |
| dsp = DumpStreamParser(stream, subpool) |
| dsp_ref = weakref.ref(dsp) |
| ptr, baton = repos.make_parse_fns3(dsp, subpool) |
| repos.parse_dumpstream3(stream, ptr, baton, False, is_cancelled) |
| self.assertEqual(self.cancel_calls, 76) |
| expected_list = [ |
| (b"magic-header", 2), |
| (b'uuid', b'92ea810a-adf3-0310-b540-bef912dcf5ba'), |
| (b'new-revision', 0), |
| (b'set-revision-prop', 0, b'svn:date', b'2005-04-01T09:57:41.312767Z'), |
| (b'close-revision', 0), |
| (b'new-revision', 1), |
| (b'set-revision-prop', 1, b'svn:log', b'Initial directory layout.'), |
| (b'set-revision-prop', 1, b'svn:author', b'john'), |
| (b'set-revision-prop', 1, b'svn:date', b'2005-04-01T10:00:52.353248Z'), |
| (b'new-node', 1, b'branches'), |
| (b'remove-node-props', 1, b'branches'), |
| (b'close-node', 1, b'branches'), |
| (b'new-node', 1, b'tags'), |
| (b'remove-node-props', 1, b'tags'), |
| (b'close-node', 1, b'tags'), |
| (b'new-node', 1, b'trunk'), |
| (b'remove-node-props', 1, b'trunk'), |
| (b'close-node', 1, b'trunk'), |
| (b'close-revision', 1), |
| (b'new-revision', 2), |
| (b'set-revision-prop', 2, b'svn:log', b'Added README.'), |
| (b'set-revision-prop', 2, b'svn:author', b'john'), |
| (b'set-revision-prop', 2, b'svn:date', b'2005-04-01T13:12:18.216267Z'), |
| (b'new-node', 2, b'trunk/README.txt'), |
| (b'remove-node-props', 2, b'trunk/README.txt'), |
| (b'set-fulltext', 2, b'trunk/README.txt'), |
| (b'close-node', 2, b'trunk/README.txt'), |
| (b'close-revision', 2), (b'new-revision', 3), |
| (b'set-revision-prop', 3, b'svn:log', b'Fixed README.\n'), |
| (b'set-revision-prop', 3, b'svn:author', b'kate'), |
| (b'set-revision-prop', 3, b'svn:date', b'2005-04-01T13:24:58.234643Z'), |
| (b'new-node', 3, b'trunk/README.txt'), |
| (b'remove-node-props', 3, b'trunk/README.txt'), |
| (b'set-node-prop', 3, b'trunk/README.txt', b'svn:mime-type', b'text/plain'), |
| (b'set-node-prop', 3, b'trunk/README.txt', b'svn:eol-style', b'native'), |
| (b'set-fulltext', 3, b'trunk/README.txt'), |
| (b'close-node', 3, b'trunk/README.txt'), (b'close-revision', 3), |
| ] |
| # Compare only the first X nodes described in the expected list - otherwise |
| # the comparison list gets too long. |
| self.assertEqual(dsp.ops[:len(expected_list)], expected_list) |
| |
| # _close_dumpstream should be invoked after 'baton' is removed. |
| self.assertEqual(False, stream.closed) |
| del ptr, baton, subpool, dsp |
| self.assertEqual(True, stream.closed) |
| # Issue SVN-4918 |
| self.assertEqual(None, dsp_ref()) |
| |
| def test_parse_fns3_invalid_set_fulltext(self): |
| class DumpStreamParserSubclass(DumpStreamParser): |
| def set_fulltext(self, node_baton): |
| DumpStreamParser.set_fulltext(self, node_baton) |
| return 42 |
| stream = open(os.path.join(os.path.dirname(sys.argv[0]), |
| "trac/versioncontrol/tests/svnrepos.dump"), "rb") |
| try: |
| dsp = DumpStreamParserSubclass() |
| ptr, baton = repos.make_parse_fns3(dsp) |
| self.assertRaises(TypeError, repos.parse_dumpstream3, |
| stream, ptr, baton, False, None) |
| finally: |
| stream.close() |
| |
| def test_get_logs(self): |
| """Test scope of get_logs callbacks""" |
| logs = [] |
| def addLog(paths, revision, author, date, message, pool): |
| if paths is not None: |
| logs.append(paths) |
| |
| # Run get_logs |
| repos.get_logs(self.repos, [b'/'], self.rev, 0, True, 0, addLog) |
| |
| # Count and verify changes |
| change_count = 0 |
| for log in logs: |
| for path_changed in core._as_list(log.values()): |
| change_count += 1 |
| path_changed.assert_valid() |
| self.assertEqual(logs[2][b"/tags/v1.1"].action, b"A") |
| self.assertEqual(logs[2][b"/tags/v1.1"].copyfrom_path, b"/branches/v1x") |
| self.assertEqual(len(logs), 12) |
| self.assertEqual(change_count, 19) |
| |
| def test_dir_delta(self): |
| """Test scope of dir_delta callbacks""" |
| # Run dir_delta |
| this_root = fs.revision_root(self.fs, self.rev) |
| prev_root = fs.revision_root(self.fs, self.rev-1) |
| editor = ChangeReceiver(this_root, prev_root) |
| e_ptr, e_baton = delta.make_editor(editor) |
| repos.dir_delta(prev_root, b'', b'', this_root, b'', e_ptr, e_baton, |
| _authz_callback, 1, 1, 0, 0) |
| |
| # Check results. |
| # Ignore the order in which the editor delivers the two sibling files. |
| self.assertEqual(set([editor.textdeltas[0].new_data, |
| editor.textdeltas[1].new_data]), |
| set([b"This is a test.\n", b"A test.\n"])) |
| self.assertEqual(len(editor.textdeltas), 2) |
| |
| def test_unnamed_editor(self): |
| """Test editor object without reference from interpreter""" |
| # Check that the delta.Editor object has proper lifetime. Without |
| # increment of the refcount in make_baton, the object was destroyed |
| # immediately because the interpreter does not hold a reference to it. |
| this_root = fs.revision_root(self.fs, self.rev) |
| prev_root = fs.revision_root(self.fs, self.rev-1) |
| e_ptr, e_baton = delta.make_editor(ChangeReceiver(this_root, prev_root)) |
| repos.dir_delta(prev_root, b'', b'', this_root, b'', e_ptr, e_baton, |
| _authz_callback, 1, 1, 0, 0) |
| |
| def test_delta_editor_leak_with_change_collector(self): |
| pool = Pool() |
| subpool = Pool(pool) |
| root = fs.revision_root(self.fs, self.rev, subpool) |
| editor = repos.ChangeCollector(self.fs, root, subpool) |
| editor_ref = weakref.ref(editor) |
| e_ptr, e_baton = delta.make_editor(editor, subpool) |
| repos.replay(root, e_ptr, e_baton, subpool) |
| |
| fs.close_root(root) |
| del root |
| self.assertNotEqual(None, editor_ref()) |
| |
| del e_ptr, e_baton, editor |
| del subpool |
| self.assertEqual(None, editor_ref()) |
| |
| def test_replay_batons_refcounts(self): |
| """Issue SVN-4917: check ref-count of batons created and used in call backs""" |
| root = fs.revision_root(self.fs, self.rev) |
| editor = BatonCollector(self.fs, root) |
| e_ptr, e_baton = delta.make_editor(editor) |
| repos.replay(root, e_ptr, e_baton) |
| for baton in editor.batons: |
| self.assertEqual(sys.getrefcount(baton[2]), 2, |
| "leak on baton %s after replay without errors" |
| % repr(baton)) |
| del e_baton |
| self.assertEqual(sys.getrefcount(e_ptr), 2, |
| "leak on editor baton after replay without errors") |
| |
| editor = BatonCollectorErrorOnClose(self.fs, root, |
| error_path=b'branches/v1x') |
| e_ptr, e_baton = delta.make_editor(editor) |
| self.assertRaises(SubversionException, repos.replay, root, e_ptr, e_baton) |
| batons= editor.batons |
| # As svn_repos_replay calls neigher close_edit callback nor abort_edit |
| # if an error has occured during processing, references of Python objects |
| # in decendant batons may live until e_baton is deleted. |
| del e_baton |
| for baton in batons: |
| self.assertEqual(sys.getrefcount(baton[2]), 2, |
| "leak on baton %s after replay with an error" |
| % repr(baton)) |
| self.assertEqual(sys.getrefcount(e_ptr), 2, |
| "leak on editor baton after replay with an error") |
| |
| def test_retrieve_and_change_rev_prop(self): |
| """Test playing with revprops""" |
| self.assertEqual(repos.fs_revision_prop(self.repos, self.rev, b"svn:log", |
| _authz_callback), |
| b"''(a few years later)'' Argh... v1.1 was buggy, " |
| b"after all") |
| |
| # We expect this to complain because we have no pre-revprop-change |
| # hook script for the repository. |
| self.assertRaises(SubversionException, repos.fs_change_rev_prop3, |
| self.repos, self.rev, b"jrandom", b"svn:log", |
| b"Youngest revision", True, True, _authz_callback) |
| |
| repos.fs_change_rev_prop3(self.repos, self.rev, b"jrandom", b"svn:log", |
| b"Youngest revision", False, False, |
| _authz_callback) |
| |
| self.assertEqual(repos.fs_revision_prop(self.repos, self.rev, b"svn:log", |
| _authz_callback), |
| b"Youngest revision") |
| |
| def freeze_body(self, pool): |
| self.freeze_invoked += 1 |
| |
| def test_freeze(self): |
| """Test repository freeze""" |
| |
| self.freeze_invoked = 0 |
| repos.freeze([self.repos_path], self.freeze_body) |
| self.assertEqual(self.freeze_invoked, 1) |
| |
| def test_lock_unlock(self): |
| """Basic lock/unlock""" |
| |
| access = fs.create_access(b'jrandom') |
| fs.set_access(self.fs, access) |
| fs.lock(self.fs, b'/trunk/README.txt', None, None, 0, 0, self.rev, False) |
| try: |
| fs.lock(self.fs, b'/trunk/README.txt', None, None, 0, 0, self.rev, False) |
| except core.SubversionException as exc: |
| self.assertEqual(exc.apr_err, core.SVN_ERR_FS_PATH_ALREADY_LOCKED) |
| fs.lock(self.fs, b'/trunk/README.txt', None, None, 0, 0, self.rev, True) |
| |
| self.calls = 0 |
| self.errors = 0 |
| def unlock_callback(path, lock, err, pool): |
| self.assertEqual(path, b'/trunk/README.txt') |
| self.assertEqual(lock, None) |
| self.calls += 1 |
| if err != None: |
| self.assertEqual(err.apr_err, core.SVN_ERR_FS_NO_SUCH_LOCK) |
| self.errors += 1 |
| |
| the_lock = fs.get_lock(self.fs, b'/trunk/README.txt') |
| fs.unlock_many(self.fs, {b'/trunk/README.txt':the_lock.token}, False, |
| unlock_callback) |
| self.assertEqual(self.calls, 1) |
| self.assertEqual(self.errors, 0) |
| |
| self.calls = 0 |
| fs.unlock_many(self.fs, {b'/trunk/README.txt':the_lock.token}, False, |
| unlock_callback) |
| self.assertEqual(self.calls, 1) |
| self.assertEqual(self.errors, 1) |
| |
| self.locks = 0 |
| def lock_callback(path, lock, err, pool): |
| self.assertEqual(path, b'/trunk/README.txt') |
| if lock != None: |
| self.assertEqual(lock.owner, b'jrandom') |
| self.locks += 1 |
| self.calls += 1 |
| if err != None: |
| self.assertEqual(err.apr_err, core.SVN_ERR_FS_PATH_ALREADY_LOCKED) |
| self.errors += 1 |
| |
| self.calls = 0 |
| self.errors = 0 |
| target = fs.lock_target_create(None, self.rev) |
| fs.lock_many(self.fs, {b'trunk/README.txt':target}, |
| None, False, 0, False, lock_callback) |
| self.assertEqual(self.calls, 1) |
| self.assertEqual(self.locks, 1) |
| self.assertEqual(self.errors, 0) |
| |
| self.calls = 0 |
| self.locks = 0 |
| fs.lock_many(self.fs, {b'trunk/README.txt':target}, |
| None, False, 0, False, lock_callback) |
| self.assertEqual(self.calls, 1) |
| self.assertEqual(self.locks, 0) |
| self.assertEqual(self.errors, 1) |
| |
| self.calls = 0 |
| self.errors = 0 |
| the_lock = fs.get_lock(self.fs, b'/trunk/README.txt') |
| repos.fs_unlock_many(self.repos, {b'trunk/README.txt':the_lock.token}, |
| False, unlock_callback) |
| self.assertEqual(self.calls, 1) |
| self.assertEqual(self.errors, 0) |
| |
| self.calls = 0 |
| repos.fs_unlock_many(self.repos, {b'trunk/README.txt':the_lock.token}, |
| False, unlock_callback) |
| self.assertEqual(self.calls, 1) |
| self.assertEqual(self.errors, 1) |
| |
| self.calls = 0 |
| self.errors = 0 |
| repos.fs_lock_many(self.repos, {b'trunk/README.txt':target}, |
| None, False, 0, False, lock_callback) |
| self.assertEqual(self.calls, 1) |
| self.assertEqual(self.locks, 1) |
| self.assertEqual(self.errors, 0) |
| |
| self.calls = 0 |
| self.locks = 0 |
| repos.fs_lock_many(self.repos, {b'trunk/README.txt':target}, |
| None, False, 0, False, lock_callback) |
| self.assertEqual(self.calls, 1) |
| self.assertEqual(self.locks, 0) |
| self.assertEqual(self.errors, 1) |
| |
| def suite(): |
| return unittest.defaultTestLoader.loadTestsFromTestCase( |
| SubversionRepositoryTestCase) |
| |
| if __name__ == '__main__': |
| runner = unittest.TextTestRunner() |
| runner.run(suite()) |