blob: 3896bcf3ded843d10edb277bfcc7b0f82c1d8c0d [file] [log] [blame]
#!/usr/bin/env python
"""Functional testing framework for command line applications"""
import difflib
import itertools
import os
import re
import subprocess
import sys
import shutil
import time
import tempfile
__all__ = ['main', 'test']
def findtests(paths):
"""Yield tests in paths in sorted order"""
for p in paths:
if os.path.isdir(p):
for root, dirs, files in os.walk(p):
if os.path.basename(root).startswith('.'):
continue
for f in sorted(files):
if not f.startswith('.') and f.endswith('.t'):
yield os.path.normpath(os.path.join(root, f))
else:
yield os.path.normpath(p)
def regex(pattern, s):
"""Match a regular expression or return False if invalid.
>>> [bool(regex(r, 'foobar')) for r in ('foo.*', '***')]
[True, False]
"""
try:
return re.match(pattern + r'\Z', s)
except re.error:
return False
def glob(el, l):
"""Match a glob-like pattern.
The only supported special characters are * and ?. Escaping is
supported.
>>> bool(glob(r'\* \\ \? fo?b*', '* \\ ? foobar'))
True
"""
i, n = 0, len(el)
res = ''
while i < n:
c = el[i]
i += 1
if c == '\\' and el[i] in '*?\\':
res += el[i - 1:i + 1]
i += 1
elif c == '*':
res += '.*'
elif c == '?':
res += '.'
else:
res += re.escape(c)
return regex(res, l)
annotations = {'glob': glob, 're': regex}
def match(el, l):
"""Match patterns based on annotations"""
for k in annotations:
ann = ' (%s)\n' % k
if el.endswith(ann) and annotations[k](el[:-len(ann)], l[:-1]):
return True
return False
class SequenceMatcher(difflib.SequenceMatcher, object):
"""Like difflib.SequenceMatcher, but matches globs and regexes"""
def find_longest_match(self, alo, ahi, blo, bhi):
"""Find longest matching block in a[alo:ahi] and b[blo:bhi]"""
# SequenceMatcher uses find_longest_match() to slowly whittle down
# the differences between a and b until it has each matching block.
# Because of this, we can end up doing the same matches many times.
matches = []
for n, (el, line) in enumerate(zip(self.a[alo:ahi], self.b[blo:bhi])):
if el != line and match(el, line):
# This fools the superclass's method into thinking that the
# regex/glob in a is identical to b by replacing a's line (the
# expected output) with b's line (the actual output).
self.a[alo + n] = line
matches.append((n, el))
ret = super(SequenceMatcher, self).find_longest_match(alo, ahi,
blo, bhi)
# Restore the lines replaced above. Otherwise, the diff output
# would seem to imply that the tests never had any regexes/globs.
for n, el in matches:
self.a[alo + n] = el
return ret
def unified_diff(a, b, fromfile='', tofile='', fromfiledate='',
tofiledate='', n=3, lineterm='\n', matcher=SequenceMatcher):
"""Compare two sequences of lines; generate the delta as a unified diff.
This is like difflib.unified_diff(), but allows custom matchers.
"""
started = False
for group in matcher(None, a, b).get_grouped_opcodes(n):
if not started:
fromdate = fromfiledate and '\t%s' % fromfiledate or ''
todate = fromfiledate and '\t%s' % tofiledate or ''
yield '--- %s%s%s' % (fromfile, fromdate, lineterm)
yield '+++ %s%s%s' % (tofile, todate, lineterm)
started = True
i1, i2, j1, j2 = group[0][1], group[-1][2], group[0][3], group[-1][4]
yield "@@ -%d,%d +%d,%d @@%s" % (i1 + 1, i2 - i1, j1 + 1, j2 - j1,
lineterm)
for tag, i1, i2, j1, j2 in group:
if tag == 'equal':
for line in a[i1:i2]:
yield ' ' + line
continue
if tag == 'replace' or tag == 'delete':
for line in a[i1:i2]:
yield '-' + line
if tag == 'replace' or tag == 'insert':
for line in b[j1:j2]:
yield '+' + line
needescape = re.compile(r'[\x00-\x09\x0b-\x1f\x7f-\xff]').search
escapesub = re.compile(r'[\x00-\x09\x0b-\x1f\\\x7f-\xff]').sub
escapemap = dict((chr(i), r'\x%02x' % i) for i in range(256))
escapemap.update({'\\': '\\\\', '\r': r'\r', '\t': r'\t'})
def escape(s):
"""Like the string-escape codec, but doesn't escape quotes"""
return escapesub(lambda m: escapemap[m.group(0)], s[:-1]) + ' (esc)\n'
def test(path, indent=2):
"""Run test at path and return input, output, and diff.
This returns a 3-tuple containing the following:
(list of lines in test, same list with actual output, diff)
diff is a generator that yields the diff between the two lists.
If a test exits with return code 80, the actual output is set to
None and diff is set to [].
"""
indent = ' ' * indent
cmdline = '%s$ ' % indent
conline = '%s> ' % indent
f = open(path)
abspath = os.path.abspath(path)
env = os.environ.copy()
env['TESTDIR'] = os.path.dirname(abspath)
p = subprocess.Popen(['/bin/sh', '-'], bufsize=-1, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, env=env,
close_fds=os.name == 'posix')
salt = 'CRAM%s' % time.time()
after = {}
refout, postout = [], []
i = pos = prepos = -1
for i, line in enumerate(f):
refout.append(line)
if line.startswith(cmdline):
after.setdefault(pos, []).append(line)
prepos = pos
pos = i
p.stdin.write('echo "\n%s %s $?"\n' % (salt, i))
p.stdin.write(line[len(cmdline):])
elif line.startswith(conline):
after.setdefault(prepos, []).append(line)
p.stdin.write(line[len(conline):])
elif not line.startswith(indent):
after.setdefault(pos, []).append(line)
p.stdin.write('echo "\n%s %s $?"\n' % (salt, i + 1))
output = p.communicate()[0]
if p.returncode == 80:
return (refout, None, [])
# Add a trailing newline to the input script if it's missing.
if refout and not refout[-1].endswith('\n'):
refout[-1] += '\n'
# We use str.split instead of splitlines to get consistent
# behavior between Python 2 and 3. In 3, we use unicode strings,
# which has more line breaks than \n and \r.
pos = -1
ret = 0
for i, line in enumerate(output[:-1].split('\n')):
line += '\n'
if line.startswith(salt):
presalt = postout.pop()
if presalt != '%s\n' % indent:
postout.append(presalt[:-1] + ' (no-eol)\n')
ret = int(line.split()[2])
if ret != 0:
postout.append('%s[%s]\n' % (indent, ret))
postout += after.pop(pos, [])
pos = int(line.split()[1])
else:
if needescape(line):
line = escape(line)
postout.append(indent + line)
postout += after.pop(pos, [])
diff = unified_diff(refout, postout, abspath, abspath + '.err')
for firstline in diff:
return refout, postout, itertools.chain([firstline], diff)
return refout, postout, []
def prompt(question, answers, auto=None):
"""Write a prompt to stdout and ask for answer in stdin.
answers should be a string, with each character a single
answer. An uppercase letter is considered the default answer.
If an invalid answer is given, this asks again until it gets a
valid one.
If auto is set, the question is answered automatically with the
specified value.
"""
default = [c for c in answers if c.isupper()]
while True:
sys.stdout.write('%s [%s] ' % (question, answers))
sys.stdout.flush()
if auto is not None:
sys.stdout.write(auto + '\n')
sys.stdout.flush()
return auto
answer = sys.stdin.readline().strip().lower()
if not answer and default:
return default[0]
elif answer and answer in answers.lower():
return answer
def log(msg=None, verbosemsg=None, verbose=False):
"""Write msg to standard out and flush.
If verbose is True, write verbosemsg instead.
"""
if verbose:
msg = verbosemsg
if msg:
sys.stdout.write(msg)
sys.stdout.flush()
def patch(cmd, diff):
"""Run echo [lines from diff] | cmd -p0"""
p = subprocess.Popen([cmd, '-p0'], bufsize=-1, stdin=subprocess.PIPE,
universal_newlines=True, close_fds=os.name == 'posix')
for line in diff:
p.stdin.write(line)
p.stdin.close()
p.wait()
return p.returncode == 0
def run(paths, tmpdir, quiet=False, verbose=False, patchcmd=None, answer=None,
indent=2):
"""Run tests in paths in tmpdir.
If quiet is True, diffs aren't printed. If verbose is True,
filenames and status information are printed.
If patchcmd is set, a prompt is written to stdout asking if
changed output should be merged back into the original test. The
answer is read from stdin. If 'y', the test is patched using patch
based on the changed output.
"""
cwd = os.getcwd()
seen = set()
skipped = failed = 0
for path in findtests(paths):
abspath = os.path.abspath(path)
if abspath in seen:
continue
seen.add(abspath)
log(None, '%s: ' % path, verbose)
if not os.stat(abspath).st_size:
skipped += 1
log('s', 'empty\n', verbose)
else:
testdir = os.path.join(tmpdir, os.path.basename(path))
os.mkdir(testdir)
try:
os.chdir(testdir)
refout, postout, diff = test(abspath, indent)
finally:
os.chdir(cwd)
errpath = abspath + '.err'
if postout is None:
skipped += 1
log('s', 'skipped\n', verbose)
elif not diff:
log('.', 'passed\n', verbose)
if os.path.exists(errpath):
os.remove(errpath)
else:
failed += 1
log('!', 'failed\n', verbose)
if not quiet:
log('\n', None, verbose)
try:
errfile = open(errpath, 'w')
for line in postout:
errfile.write(line)
errfile.close()
except:
# XXX - hack for VPATH builds
pass
if not quiet:
if patchcmd:
diff = list(diff)
for line in diff:
log(line)
if (patchcmd and
prompt('Accept this change?', 'yN', answer) == 'y'):
if patch(patchcmd, diff):
log(None, '%s: merged output\n' % path, verbose)
os.remove(errpath)
else:
log('%s: merge failed\n' % path)
log('\n', None, verbose)
log('# Ran %s tests, %s skipped, %s failed.\n'
% (len(seen), skipped, failed))
return bool(failed)
def which(cmd):
"""Return the patch to cmd or None if not found"""
for p in os.environ['PATH'].split(os.pathsep):
path = os.path.join(p, cmd)
if os.path.exists(path) and os.access(path, os.X_OK):
return path
return None
def main(args):
"""Main entry point.
args should not contain the script name.
"""
from optparse import OptionParser
eargs = os.environ.get('CRAM', '').strip()
if eargs:
import shlex
args += shlex.split(eargs)
p = OptionParser(usage='cram [OPTIONS] TESTS...')
p.add_option('-V', '--version', action='store_true',
help='show version information and exit')
p.add_option('-q', '--quiet', action='store_true',
help="don't print diffs")
p.add_option('-v', '--verbose', action='store_true',
help='show filenames and test status')
p.add_option('-i', '--interactive', action='store_true',
help='interactively merge changed test output')
p.add_option('-y', '--yes', action='store_true',
help='answer yes to all questions')
p.add_option('-n', '--no', action='store_true',
help='answer no to all questions')
p.add_option('--keep-tmpdir', action='store_true',
help='keep temporary directories')
p.add_option('--indent', action='store', default=2, metavar='NUM',
type='int', help='number of spaces to use for indentation')
p.add_option('-E', action='store_false', dest='sterilize', default=True,
help="don't reset common environment variables")
opts, paths = p.parse_args(args)
if opts.version:
sys.stdout.write("""Cram CLI testing framework (version 0.5)
Copyright (C) 2010-2011 Brodie Rao <brodie@bitheap.org> and others
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
""")
return
conflicts = [('-y', opts.yes, '-n', opts.no),
('-q', opts.quiet, '-i', opts.interactive)]
for s1, o1, s2, o2 in conflicts:
if o1 and o2:
sys.stderr.write('options %s and %s are mutually exclusive\n'
% (s1, s2))
return 2
patchcmd = None
if opts.interactive:
patchcmd = which('patch')
if not patchcmd:
sys.stderr.write('patch(1) required for -i\n')
return 2
if not paths:
sys.stdout.write(p.get_usage())
return 2
badpaths = [path for path in paths if not os.path.exists(path)]
if badpaths:
sys.stderr.write('no such file: %s\n' % badpaths[0])
return 2
tmpdir = os.environ['CRAMTMP'] = tempfile.mkdtemp('', 'cramtests-')
proctmp = os.path.join(tmpdir, 'tmp')
os.mkdir(proctmp)
for s in ('TMPDIR', 'TEMP', 'TMP'):
os.environ[s] = proctmp
# XXX - VPATH hackery
if 'BUILDDIR' not in os.environ:
parent = os.path.dirname(os.path.realpath(os.path.dirname(__file__)))
os.environ['BUILDDIR'] = parent
if opts.sterilize:
for s in ('LANG', 'LC_ALL', 'LANGUAGE'):
os.environ[s] = 'C'
os.environ['TZ'] = 'GMT'
os.environ['CDPATH'] = ''
os.environ['COLUMNS'] = '80'
os.environ['GREP_OPTIONS'] = ''
# XXX whack SSH_CONNECTION
os.environ.pop('SSH_CONNECTION', None)
if opts.yes:
answer = 'y'
elif opts.no:
answer = 'n'
else:
answer = None
try:
return run(paths, tmpdir, opts.quiet, opts.verbose, patchcmd, answer,
opts.indent)
finally:
if opts.keep_tmpdir:
log('# Kept temporary directory: %s\n' % tmpdir)
else:
shutil.rmtree(tmpdir)
if __name__ == '__main__':
try:
sys.exit(main(sys.argv[1:]))
except KeyboardInterrupt:
pass