blob: c828e5e789bca7fe83d291e1970de93954c173ee [file] [log] [blame]
#!/usr/bin/env python3
"""
This is the archiver ID generator test suite.
It tests live generated IDs against a set of predefined reference constants.
"""
import sys
import os
import mailbox
import yaml
import argparse
import collections
import interfacer
import time
import email.utils
parse_html = False
nonce = None
fake_args = collections.namedtuple('fakeargs', ['verbose', 'ibody'])(False, None)
# get raw message, allowing for mboxo translation
def _raw(args, mbox, key):
if args.nomboxo: # No need to filter the data
file=mbox.get_file(key, True)
message_raw=file.read()
file.close()
else:
from mboxo_patch import MboxoReader
file=mbox.get_file(key, True)
file=MboxoReader(file)
message_raw=file.read()
file.close()
return message_raw
def generate_specs(args):
if not args.nomboxo:
# Temporary patch to fix Python email package limitation
# It must be removed when the Python package is fixed
from mboxo_patch import MboxoFactory
import archiver
if args.generators:
generator_names = args.generators
else:
try:
import generators
except:
import plugins.generators as generators
generator_names = generators.generator_names() if hasattr(generators, 'generator_names') else ['full', 'medium', 'cluster', 'legacy']
yml = {}
# sort so most recent generators come last to make comparisons easier
for gen_type in sorted(generator_names, key=lambda s: s.replace('dkim','zkim')):
test_args = collections.namedtuple('testargs', ['parse_html', 'generator'])(parse_html, gen_type)
archie = interfacer.Archiver(archiver, test_args)
sys.stderr.write("Generating specs for type '%s'...\n" % gen_type)
gen_spec = []
mbox = mailbox.mbox(args.mboxfile, None if args.nomboxo else MboxoFactory, create=False)
for key in mbox.keys():
message_raw = _raw(args, mbox, key)
message = mbox.get(key)
lid = args.lid or archiver.normalize_lid(message.get('list-id', '??'))
json = archie.compute_updates(fake_args, lid, False, message, message_raw)
mid = message.get('message-id','').strip()
if json:
gen_spec.append({
'index': key,
'message-id': mid,
'generated': json['mid'],
})
else:
print("Cannot parse index %d: %s" % (key, mid))
yml[gen_type] = gen_spec
with open(args.generate, 'w') as f:
# don't sort keys here
yaml.dump({'args': {'cmd': " ".join(sys.argv)}, 'generators': {args.mboxfile: yml}}, f, sort_keys=False)
f.close()
def run_tests(args):
if not args.nomboxo:
# Temporary patch to fix Python email package limitation
# It must be removed when the Python package is fixed
from mboxo_patch import MboxoFactory
import archiver
import logging
verbose_logger = logging.getLogger()
verbose_logger.setLevel(logging.WARN)
verbose_logger.addHandler(logging.StreamHandler(sys.stderr))
archiver.logger = verbose_logger
try:
import generators
except:
import plugins.generators as generators
errors = 0
skipped = 0
tests_run = 0
yml = yaml.safe_load(open(args.load, 'r'))
_env = {}
if 'args' in yml and 'env' in yml['args']:
_env = yml['args']['env']
generator_names = generators.generator_names() if hasattr(generators, 'generator_names') else ['full', 'medium', 'cluster', 'legacy']
if args.generators:
generator_names = args.generators
mboxfiles = []
for file, run in yml['generators'].items():
mboxfiles.append(file)
if not run: # No tests under this filename, run same tests as next
continue
for gen_type, tests in run.items():
if gen_type not in generator_names:
sys.stderr.write("Warning: generators.py does not have the '%s' generator, skipping tests\n" % gen_type)
continue
test_args = collections.namedtuple('testargs', ['parse_html', 'generator'])(parse_html, gen_type)
archie = interfacer.Archiver(archiver, test_args)
for mboxfile in mboxfiles:
sys.stderr.write("Starting to process %s using %s\n" % (mboxfile,gen_type))
mbox = mailbox.mbox(mboxfile, None if args.nomboxo else MboxoFactory, create=False)
no_messages = len(mbox.keys())
no_tests = len(tests)
if no_messages != no_tests:
sys.stderr.write("Warning: %s run for %s contains %u tests, but mbox file has %u emails!\n" %
(gen_type, mboxfile, no_tests, no_messages))
for test in tests:
tests_run += 1
key = test['index']
message_raw = _raw(args, mbox, key)
message = mbox.get(key)
# Mock archived-at for slightly broken medium generators
if 'MOCK_AAT' in _env and gen_type == 'medium':
mock_aat = email.utils.formatdate(int(_env['MOCK_AAT']), False)
try:
message.replace_header('archived-at', mock_aat)
except:
message['archived-at'] = mock_aat
msgid =(message.get('message-id') or '').strip()
dateheader = message.get('date')
if args.skipnodate and not dateheader:
print("""[SKIP] %s, index %2u: No date header found and --skipnodate specified, skipping this test!""" %
(gen_type, key, ))
skipped += 1
continue
if msgid != test['message-id']:
sys.stderr.write("""[SEQ?] %s, index %2u: Expected '%s', got '%s'!\n""" %
(gen_type, key, test['message-id'], msgid))
continue # no point continuing
lid = args.lid or archiver.normalize_lid(message.get('list-id', '??'))
json = archie.compute_updates(fake_args, lid, False, message, message_raw)
# get override for version (if any)
expected = test.get(archie.version, test['generated'])
actual = json['mid']
if actual != expected:
errors += 1
sys.stderr.write("""[FAIL] %s, index %2u: Expected '%s', got '%s'!\n""" %
(gen_type, key, expected, actual))
if args.dropin and gen_type == args.dropin:
if expected != actual:
test['generated'] = actual
else:
test['alternate'] = actual
else:
print("[PASS] %s index %u" % (gen_type, key))
mboxfiles = [] # reset for the next set of tests
if args.dropin and errors:
sys.stderr.write("Writing replacement yaml as --dropin was specified\n")
yaml.safe_dump(yml, open(args.load, "w"), sort_keys=False)
# N.B. The following line is parsed by runall.py
print("[DONE] %u tests run, %u failed. Skipped %u." % (tests_run, errors, skipped))
if errors:
sys.exit(-1)
def main():
parser = argparse.ArgumentParser(description='Command line options.')
parser.add_argument('--generate', dest='generate', type=str,
help='Generate a test yaml spec, output to file specified here')
parser.add_argument('--generators', dest='generators', nargs='+', type=str,
help='Override the list of generator names')
parser.add_argument('--load', dest='load', type=str,
help='Load and run tests from a yaml spec file')
parser.add_argument('--mbox', dest='mboxfile', type=str,
help='If generating spec, which mbox corpus file to use for testing')
parser.add_argument('--listid', dest='lid', type=str,
help='List-ID header override if needed')
parser.add_argument('--rootdir', dest='rootdir', type=str, required=True,
help="Root directory of Apache Pony Mail")
parser.add_argument('--nomboxo', dest = 'nomboxo', action='store_true',
help = 'Skip Mboxo processing')
parser.add_argument('--dropin', dest = 'dropin', type=str,
help = 'Perform drop-in replacement of unit test results for the specified generator type [devs only!]')
parser.add_argument('--skipnodate', dest = 'skipnodate', action='store_true',
help = 'Skip emails with no Date: header (useful for medium generator tests)')
args = parser.parse_args()
if args.rootdir:
tools_dir = os.path.join(args.rootdir, 'tools')
else:
tools_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', "tools")
sys.path.append(tools_dir)
if os.environ.get('MOCK_GMTIME'):
import time
import traceback
save_gmtime = time.gmtime
def _time_gmtime(secs=None):
if secs is None:
callers = traceback.extract_stack(limit=2) # want last-1 and last (i.e. here)
[filename, _, _, _] = callers[0] # This is last-1, i.e. my caller
if filename.endswith("/tools/archiver.py") or filename.endswith("tools/generators.py"):
return save_gmtime(0)
return save_gmtime(secs)
time.gmtime = _time_gmtime
if args.generate:
if not args.mboxfile:
sys.stderr.write("Generating a test spec requires an mbox filepath passed with --mbox!\n")
sys.exit(-1)
generate_specs(args)
elif args.load:
run_tests(args)
if __name__ == '__main__':
main()