blob: efc642c0106a735873709842ebb57a0f7bea8f1c [file] [log] [blame]
#!/usr/bin/python
# ====================================================================
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ====================================================================
# Run this without arguments to run unit tests.
# Run with a path to a davautocheck ops log to test that it can parse that.
import os
import re
import sys
import tempfile
try:
# Python >=3.0
from urllib.parse import quote as urllib_parse_quote
except ImportError:
# Python <3.0
from urllib import quote as urllib_parse_quote
import unittest
import svn.core
import svn_server_log_parse
class TestCase(unittest.TestCase):
def setUp(self):
# Define a class to stuff everything passed to any handle_
# method into self.result.
class cls(svn_server_log_parse.Parser):
def __getattr__(cls_self, attr):
if attr.startswith('handle_'):
return lambda *a: setattr(self, 'result', a)
raise AttributeError
self.parse = cls().parse
def test_unknown(self):
line = 'unknown log line'
self.parse(line)
self.assertEqual(self.result, (line,))
def test_open(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'open')
self.assertRaises(svn_server_log_parse.Error, self.parse, 'open 2 cap / SVN/1.60. fooclient')
self.assertRaises(svn_server_log_parse.Error, self.parse, 'open a cap=() / SVN/1.60. fooclient')
self.assertEqual(self.parse('open 2 cap=() / SVN fooclient'), '')
self.assertEqual(self.result, (2, [], '/', 'SVN', 'fooclient'))
# TODO: Teach it about the capabilities, rather than allowing
# any words at all.
self.assertEqual(self.parse('open 2 cap=(foo) / SVN foo%20client'), '')
self.assertEqual(self.result, (2, ['foo'], '/', 'SVN', 'foo client'))
def test_reparent(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'reparent')
self.assertEqual(self.parse('reparent /'), '')
self.assertEqual(self.result, ('/',))
def test_get_latest_rev(self):
self.assertEqual(self.parse('get-latest-rev'), '')
self.assertEqual(self.result, ())
self.assertEqual(self.parse('get-latest-rev r3'), 'r3')
self.assertEqual(self.result, ())
def test_get_dated_rev(self):
self.assertRaises(svn_server_log_parse.Error, self.parse,
'get-dated-rev')
self.assertEqual(self.parse('get-dated-rev 2008-04-15T20:41:24.000000Z'), '')
self.assertEqual(self.result, ('2008-04-15T20:41:24.000000Z',))
def test_commit(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'commit')
self.assertRaises(svn_server_log_parse.Error, self.parse, 'commit 3')
self.assertEqual(self.parse('commit r3'), '')
self.assertEqual(self.result, (3,))
self.assertEqual(self.parse('commit r3 leftover'), ' leftover')
self.assertEqual(self.result, (3,))
def test_get_dir(self):
self.get_dir_or_file('get-dir')
def test_get_file(self):
self.get_dir_or_file('get-file')
def get_dir_or_file(self, c):
self.assertRaises(svn_server_log_parse.Error, self.parse, c)
self.assertRaises(svn_server_log_parse.Error, self.parse, c + ' foo')
self.assertRaises(svn_server_log_parse.Error, self.parse, c + ' foo 3')
self.assertEqual(self.parse(c + ' /a/b/c r3 ...'), ' ...')
self.assertEqual(self.result, ('/a/b/c', 3, False, False))
self.assertEqual(self.parse(c + ' / r3'), '')
self.assertEqual(self.result, ('/', 3, False, False))
# path must be absolute
self.assertRaises(svn_server_log_parse.Error,
self.parse, c + ' a/b/c r3')
self.assertEqual(self.parse(c + ' /k r27 text'), '')
self.assertEqual(self.result, ('/k', 27, True, False))
self.assertEqual(self.parse(c + ' /k r27 props'), '')
self.assertEqual(self.result, ('/k', 27, False, True))
self.assertEqual(self.parse(c + ' /k r27 text props'), '')
self.assertEqual(self.result, ('/k', 27, True, True))
# out of order not accepted
self.assertEqual(self.parse(c + ' /k r27 props text'), ' text')
self.assertEqual(self.result, ('/k', 27, False, True))
def test_lock(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'lock')
self.parse('lock (/foo)')
self.assertEqual(self.result, (['/foo'], False))
self.assertEqual(self.parse('lock (/foo) steal ...'), ' ...')
self.assertEqual(self.result, (['/foo'], True))
self.assertEqual(self.parse('lock (/foo) stear'), ' stear')
def test_change_rev_prop(self):
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'change-rev-prop r3')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'change-rev-prop r svn:log')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'change-rev-prop rX svn:log')
self.assertEqual(self.parse('change-rev-prop r3 svn:log ...'), ' ...')
self.assertEqual(self.result, (3, 'svn:log'))
def test_rev_proplist(self):
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'rev-proplist')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'rev-proplist r')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'rev-proplist rX')
self.assertEqual(self.parse('rev-proplist r3 ...'), ' ...')
self.assertEqual(self.result, (3,))
def test_rev_prop(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'rev-prop')
self.assertRaises(svn_server_log_parse.Error, self.parse, 'rev-prop r')
self.assertRaises(svn_server_log_parse.Error, self.parse, 'rev-prop rX')
self.assertEqual(self.parse('rev-prop r3 foo ...'), ' ...')
self.assertEqual(self.result, (3, 'foo'))
def test_unlock(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'unlock')
self.parse('unlock (/foo)')
self.assertEqual(self.result, (['/foo'], False))
self.assertEqual(self.parse('unlock (/foo) break ...'), ' ...')
self.assertEqual(self.result, (['/foo'], True))
self.assertEqual(self.parse('unlock (/foo) bear'), ' bear')
def test_get_lock(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'get-lock')
self.parse('get-lock /foo')
self.assertEqual(self.result, ('/foo',))
def test_get_locks(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'get-locks')
self.parse('get-locks /foo')
self.assertEqual(self.result, ('/foo',))
def test_get_locations(self):
self.assertRaises(svn_server_log_parse.Error, self.parse,
'get-locations')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-locations /foo 3')
self.assertEqual(self.parse('get-locations /foo (3 4) ...'), ' ...')
self.assertEqual(self.result, ('/foo', [3, 4]))
self.assertEqual(self.parse('get-locations /foo (3)'), '')
self.assertEqual(self.result, ('/foo', [3]))
def test_get_location_segments(self):
self.assertRaises(svn_server_log_parse.Error, self.parse,
'get-location-segments')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-location-segments /foo 3')
self.assertEqual(self.parse('get-location-segments /foo@2 r3:4'), '')
self.assertEqual(self.result, ('/foo', 2, 3, 4))
def test_get_file_revs(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'get-file-revs')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-file-revs /foo 3')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-file-revs /foo 3:a')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-file-revs /foo r3:a')
self.assertEqual(self.parse('get-file-revs /foo r3:4 ...'), ' ...')
self.assertEqual(self.result, ('/foo', 3, 4, False))
self.assertEqual(self.parse('get-file-revs /foo r3:4'
' include-merged-revisions ...'), ' ...')
self.assertEqual(self.result, ('/foo', 3, 4, True))
def test_get_mergeinfo(self):
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-mergeinfo')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-mergeinfo /foo')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-mergeinfo (/foo')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-mergeinfo (/foo /bar')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'get-mergeinfo (/foo)')
self.assertRaises(svn_server_log_parse.BadMergeinfoInheritanceError,
self.parse, 'get-mergeinfo (/foo) bork')
self.assertEqual(self.parse('get-mergeinfo (/foo) explicit'), '')
self.assertEqual(self.result, (['/foo'],
svn.core.svn_mergeinfo_explicit, False))
self.assertEqual(self.parse('get-mergeinfo (/foo /bar) inherited ...'),
' ...')
self.assertEqual(self.result, (['/foo', '/bar'],
svn.core.svn_mergeinfo_inherited, False))
self.assertEqual(self.result, (['/foo', '/bar'],
svn.core.svn_mergeinfo_inherited, False))
def test_log(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'log')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'log /foo')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'log (/foo)')
self.assertEqual(self.parse('log (/foo) r3:4'
' include-merged-revisions'), '')
self.assertEqual(self.result,
(['/foo'], 3, 4, 0, False, False, True, []))
self.assertEqual(self.parse('log (/foo /bar) r3:4 revprops=all ...'),
' ...')
self.assertEqual(self.result,
(['/foo', '/bar'], 3, 4, 0, False, False, False, None))
self.assertEqual(self.parse('log (/foo) r3:4 revprops=(a b) ...'),
' ...')
self.assertEqual(self.result,
(['/foo'], 3, 4, 0, False, False, False, ['a', 'b']))
self.assertEqual(self.parse('log (/foo) r8:1 limit=3'), '')
self.assertEqual(self.result,
(['/foo'], 8, 1, 3, False, False, False, []))
def test_check_path(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'check-path')
self.assertEqual(self.parse('check-path /foo@9'), '')
self.assertEqual(self.result, ('/foo', 9))
def test_stat(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'stat')
self.assertEqual(self.parse('stat /foo@9'), '')
self.assertEqual(self.result, ('/foo', 9))
def test_replay(self):
self.assertRaises(svn_server_log_parse.Error, self.parse, 'replay')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'replay /foo')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'replay (/foo) r9')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'replay (/foo) r9:10')
self.assertEqual(self.parse('replay /foo r9'), '')
self.assertEqual(self.result, ('/foo', 9))
def test_checkout_or_export(self):
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'checkout-or-export')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'checkout-or-export /foo')
self.assertEqual(self.parse('checkout-or-export /foo r9'), '')
self.assertEqual(self.result, ('/foo', 9, svn.core.svn_depth_unknown))
self.assertRaises(svn_server_log_parse.BadDepthError, self.parse,
'checkout-or-export /foo r9 depth=INVALID-DEPTH')
self.assertRaises(svn_server_log_parse.BadDepthError, self.parse,
'checkout-or-export /foo r9 depth=bork')
self.assertEqual(self.parse('checkout-or-export /foo r9 depth=files .'),
' .')
self.assertEqual(self.result, ('/foo', 9, svn.core.svn_depth_files))
def test_diff_1path(self):
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'diff')
self.assertEqual(self.parse('diff /foo r9:10'), '')
self.assertEqual(self.result, ('/foo', 9, 10,
svn.core.svn_depth_unknown, False))
self.assertEqual(self.parse('diff /foo r9:10'
' ignore-ancestry ...'), ' ...')
self.assertEqual(self.result, ('/foo', 9, 10,
svn.core.svn_depth_unknown, True))
self.assertEqual(self.parse('diff /foo r9:10 depth=files'), '')
self.assertEqual(self.result, ('/foo', 9, 10,
svn.core.svn_depth_files, False))
def test_diff_2paths(self):
self.assertEqual(self.parse('diff /foo@9 /bar@10'), '')
self.assertEqual(self.result, ('/foo', 9, '/bar', 10,
svn.core.svn_depth_unknown, False))
self.assertEqual(self.parse('diff /foo@9 /bar@10'
' ignore-ancestry ...'), ' ...')
self.assertEqual(self.result, ('/foo', 9, '/bar', 10,
svn.core.svn_depth_unknown, True))
self.assertEqual(self.parse('diff /foo@9 /bar@10'
' depth=files ignore-ancestry'), '')
self.assertEqual(self.result, ('/foo', 9, '/bar', 10,
svn.core.svn_depth_files, True))
def test_status(self):
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'status')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'status /foo')
self.assertEqual(self.parse('status /foo r9'), '')
self.assertEqual(self.result, ('/foo', 9, svn.core.svn_depth_unknown))
self.assertRaises(svn_server_log_parse.BadDepthError, self.parse,
'status /foo r9 depth=INVALID-DEPTH')
self.assertRaises(svn_server_log_parse.BadDepthError, self.parse,
'status /foo r9 depth=bork')
self.assertEqual(self.parse('status /foo r9 depth=files .'),
' .')
self.assertEqual(self.result, ('/foo', 9, svn.core.svn_depth_files))
def test_switch(self):
self.assertEqual(self.parse('switch /foo /bar@10 ...'), ' ...')
self.assertEqual(self.result, ('/foo', '/bar', 10,
svn.core.svn_depth_unknown))
self.assertEqual(self.parse('switch /foo /bar@10'
' depth=files'), '')
self.assertEqual(self.result, ('/foo', '/bar', 10,
svn.core.svn_depth_files))
def test_update(self):
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'update')
self.assertRaises(svn_server_log_parse.Error,
self.parse, 'update /foo')
self.assertEqual(self.parse('update /foo r9'), '')
self.assertEqual(self.result, ('/foo', 9, svn.core.svn_depth_unknown,
False))
self.assertRaises(svn_server_log_parse.BadDepthError, self.parse,
'update /foo r9 depth=INVALID-DEPTH')
self.assertRaises(svn_server_log_parse.BadDepthError, self.parse,
'update /foo r9 depth=bork')
self.assertEqual(self.parse('update /foo r9 depth=files .'), ' .')
self.assertEqual(self.result, ('/foo', 9, svn.core.svn_depth_files,
False))
self.assertEqual(self.parse('update /foo r9 send-copyfrom-args .'),
' .')
self.assertEqual(self.result, ('/foo', 9, svn.core.svn_depth_unknown,
True))
if __name__ == '__main__':
if len(sys.argv) == 1:
# No arguments so run the unit tests.
unittest.main()
sys.stderr.write('unittest.main failed to exit\n')
sys.exit(2)
# Use the argument as the path to a log file to test against.
def uri_encode(s):
# urllib.parse.quote encodes :&@ characters, svn does not.
return urllib_parse_quote(s, safe='/:&@')
# Define a class to reconstruct the SVN-ACTION string.
class Test(svn_server_log_parse.Parser):
def handle_unknown(self, line):
sys.stderr.write('unknown log line at %d:\n%s\n' % (self.linenum,
line))
sys.exit(2)
def handle_open(self, protocol, capabilities, path, ra_client, client):
capabilities = ' '.join(capabilities)
if ra_client is None:
ra_client = '-'
if client is None:
client = '-'
path = uri_encode(path)
self.action = ('open %d cap=(%s) %s %s %s'
% (protocol, capabilities, path, ra_client, client))
def handle_reparent(self, path):
path = uri_encode(path)
self.action = 'reparent ' + path
def handle_get_latest_rev(self):
self.action = 'get-latest-rev'
def handle_get_dated_rev(self, date):
self.action = 'get-dated-rev ' + date
def handle_commit(self, revision):
self.action = 'commit r%d' % (revision,)
def handle_get_dir(self, path, revision, text, props):
path = uri_encode(path)
self.action = 'get-dir %s r%d' % (path, revision)
if text:
self.action += ' text'
if props:
self.action += ' props'
def handle_get_file(self, path, revision, text, props):
path = uri_encode(path)
self.action = 'get-file %s r%d' % (path, revision)
if text:
self.action += ' text'
if props:
self.action += ' props'
def handle_lock(self, paths, steal):
paths = [uri_encode(x) for x in paths]
self.action = 'lock (%s)' % (' '.join(paths),)
if steal:
self.action += ' steal'
def handle_change_rev_prop(self, revision, revprop):
revprop = uri_encode(revprop)
self.action = 'change-rev-prop r%d %s' % (revision, revprop)
def handle_rev_prop(self, revision, revprop):
revprop = uri_encode(revprop)
self.action = 'rev-prop r%d %s' % (revision, revprop)
def handle_rev_proplist(self, revision):
self.action = 'rev-proplist r%d' % (revision,)
def handle_unlock(self, paths, break_lock):
paths = [uri_encode(x) for x in paths]
self.action = 'unlock (%s)' % (' '.join(paths),)
if break_lock:
self.action += ' break'
def handle_get_lock(self, path):
path = uri_encode(path)
self.action = 'get-lock ' + path
def handle_get_locks(self, path):
self.action = 'get-locks ' + path
path = uri_encode(path)
def handle_get_locations(self, path, revisions):
path = uri_encode(path)
self.action = ('get-locations %s (%s)'
% (path, ' '.join([str(x) for x in revisions])))
def handle_get_location_segments(self, path, peg, left, right):
path = uri_encode(path)
self.action = 'get-location-segments %s@%d r%d:%d' % (path, peg,
left, right)
def handle_get_file_revs(self, path, left, right,
include_merged_revisions):
path = uri_encode(path)
self.action = 'get-file-revs %s r%d:%d' % (path, left, right)
if include_merged_revisions:
self.action += ' include-merged-revisions'
def handle_get_mergeinfo(self, paths, inheritance, include_descendants):
paths = [uri_encode(x) for x in paths]
self.action = ('get-mergeinfo (%s) %s'
% (' '.join(paths),
svn.core.svn_inheritance_to_word(inheritance)))
if include_descendants:
self.action += ' include-descendants'
def handle_log(self, paths, left, right, limit, discover_changed_paths,
strict, include_merged_revisions, revprops):
paths = [uri_encode(x) for x in paths]
self.action = 'log (%s) r%d:%d' % (' '.join(paths),
left, right)
if limit != 0:
self.action += ' limit=%d' % (limit,)
if discover_changed_paths:
self.action += ' discover-changed-paths'
if strict:
self.action += ' strict'
if include_merged_revisions:
self.action += ' include-merged-revisions'
if revprops is None:
self.action += ' revprops=all'
elif len(revprops) > 0:
revprops = [uri_encode(x) for x in revprops]
self.action += ' revprops=(%s)' % (' '.join(revprops),)
def handle_check_path(self, path, revision):
path = uri_encode(path)
self.action = 'check-path %s@%d' % (path, revision)
def handle_stat(self, path, revision):
path = uri_encode(path)
self.action = 'stat %s@%d' % (path, revision)
def handle_replay(self, path, revision):
path = uri_encode(path)
self.action = 'replay %s r%d' % (path, revision)
def maybe_depth(self, depth):
if depth != svn.core.svn_depth_unknown:
self.action += ' depth=%s' % (
svn.core.svn_depth_to_word(depth),)
def handle_checkout_or_export(self, path, revision, depth):
path = uri_encode(path)
self.action = 'checkout-or-export %s r%d' % (path, revision)
self.maybe_depth(depth)
def handle_diff_1path(self, path, left, right,
depth, ignore_ancestry):
path = uri_encode(path)
self.action = 'diff %s r%d:%d' % (path, left, right)
self.maybe_depth(depth)
if ignore_ancestry:
self.action += ' ignore-ancestry'
def handle_diff_2paths(self, from_path, from_rev,
to_path, to_rev,
depth, ignore_ancestry):
from_path = uri_encode(from_path)
to_path = uri_encode(to_path)
self.action = ('diff %s@%d %s@%d'
% (from_path, from_rev, to_path, to_rev))
self.maybe_depth(depth)
if ignore_ancestry:
self.action += ' ignore-ancestry'
def handle_status(self, path, revision, depth):
path = uri_encode(path)
self.action = 'status %s r%d' % (path, revision)
self.maybe_depth(depth)
def handle_switch(self, from_path, to_path, to_rev, depth):
from_path = uri_encode(from_path)
to_path = uri_encode(to_path)
self.action = ('switch %s %s@%d'
% (from_path, to_path, to_rev))
self.maybe_depth(depth)
def handle_update(self, path, revision, depth, send_copyfrom_args):
path = uri_encode(path)
self.action = 'update %s r%d' % (path, revision)
self.maybe_depth(depth)
if send_copyfrom_args:
self.action += ' send-copyfrom-args'
tmp = tempfile.mktemp()
try:
fp = open(tmp, 'w')
parser = Test()
parser.linenum = 0
log_file = sys.argv[1]
log_type = None
for line in open(log_file):
if log_type is None:
# Figure out which log type we have.
if re.match(r'\d+ \d\d\d\d-', line):
log_type = 'svnserve'
elif re.match(r'\[\d\d/', line):
log_type = 'mod_dav_svn'
else:
sys.stderr.write("unknown log format in '%s'"
% (log_file,))
sys.exit(3)
sys.stderr.write('parsing %s log...\n' % (log_type,))
sys.stderr.flush()
words = line.split()
if log_type == 'svnserve':
# Skip over PID, date, client address, username, and repos.
if words[5].startswith('ERR'):
# Skip error lines.
fp.write(line)
continue
leading = ' '.join(words[:5])
action = ' '.join(words[5:])
else:
# Find the SVN-ACTION string from the CustomLog format
# davautocheck.sh uses. If that changes, this will need
# to as well. Currently it's
# %t %u %{SVN-REPOS-NAME}e %{SVN-ACTION}e
leading = ' '.join(words[:4])
action = ' '.join(words[4:])
# Parse the action and write the reconstructed action to
# the temporary file. Ignore the returned trailing text,
# as we have none in the davautocheck ops log.
parser.linenum += 1
try:
parser.parse(action)
except svn_server_log_parse.Error:
sys.stderr.write('error at line %d: %s\n'
% (parser.linenum, action))
raise
fp.write(leading + ' ' + parser.action + '\n')
fp.close()
# Check differences between original and reconstructed files
# (should be identical).
result = os.spawnlp(os.P_WAIT, 'diff', 'diff', '-u', log_file, tmp)
if result == 0:
sys.stderr.write('OK\n')
sys.exit(result)
finally:
try:
os.unlink(tmp)
except Exception as e:
sys.stderr.write('os.unlink(tmp): %s\n' % (e,))