#!/usr/bin/env python3
"""
This is the archiver ID generator test suite.
It tests live generated IDs against a set of predefined reference constants.
"""
import sys
import os
import mailbox
import yaml
import argparse
import collections
import interfacer
import time
import email.utils

parse_html = False
nonce = None
fake_args = collections.namedtuple('fakeargs', ['verbose', 'ibody'])(False, None)

# get raw message, allowing for mboxo translation
def _raw(args, mbox, key):
    if args.nomboxo: # No need to filter the data
        file=mbox.get_file(key, True)
        message_raw=file.read()
        file.close()
    else:
        from mboxo_patch import MboxoReader
        file=mbox.get_file(key, True)
        file=MboxoReader(file)
        message_raw=file.read()
        file.close()
    return message_raw

def generate_specs(args):
    if not args.nomboxo:
        # Temporary patch to fix Python email package limitation
        # It must be removed when the Python package is fixed
        from mboxo_patch import MboxoFactory
    import archiver
    if args.generators:
        generator_names = args.generators
    else:
        try:
            import generators
        except:
            import plugins.generators as generators
        generator_names = generators.generator_names() if hasattr(generators, 'generator_names') else ['full', 'medium', 'cluster', 'legacy']
    yml = {}
    # sort so most recent generators come last to make comparisons easier
    for gen_type in sorted(generator_names, key=lambda s: s.replace('dkim','zkim')):
        test_args = collections.namedtuple('testargs', ['parse_html', 'generator'])(parse_html, gen_type)
        archie = interfacer.Archiver(archiver, test_args)
        sys.stderr.write("Generating specs for type '%s'...\n" % gen_type)

        gen_spec = []
        mbox = mailbox.mbox(args.mboxfile, None if args.nomboxo else MboxoFactory, create=False)
        for key in mbox.keys():
            message_raw = _raw(args, mbox, key)
            message = mbox.get(key)
            lid = args.lid or archiver.normalize_lid(message.get('list-id', '??'))
            json = archie.compute_updates(fake_args, lid, False, message, message_raw)
            mid = message.get('message-id','').strip()
            if json:
                gen_spec.append({
                    'index': key,
                    'message-id': mid,
                    'generated': json['mid'],
                })
            else:
                print("Cannot parse index %d: %s" % (key, mid))
        yml[gen_type] = gen_spec
    with open(args.generate, 'w') as f:
        # don't sort keys here
        yaml.dump({'args': {'cmd': " ".join(sys.argv)}, 'generators': {args.mboxfile: yml}}, f, sort_keys=False)
        f.close()


def run_tests(args):
    if not args.nomboxo:
        # Temporary patch to fix Python email package limitation
        # It must be removed when the Python package is fixed
        from mboxo_patch import MboxoFactory
    import archiver
    import logging
    verbose_logger = logging.getLogger()
    verbose_logger.setLevel(logging.WARN)
    verbose_logger.addHandler(logging.StreamHandler(sys.stderr))
    archiver.logger = verbose_logger

    try:
        import generators
    except:
        import plugins.generators as generators
    errors = 0
    skipped = 0
    tests_run = 0
    yml = yaml.safe_load(open(args.load, 'r'))
    _env = {}
    if 'args' in yml and 'env' in yml['args']:
        _env = yml['args']['env']
    generator_names = generators.generator_names() if hasattr(generators, 'generator_names') else ['full', 'medium', 'cluster', 'legacy']
    if args.generators:
        generator_names = args.generators
    mboxfiles = []
    for file, run in yml['generators'].items():
        mboxfiles.append(file)
        if not run: # No tests under this filename, run same tests as next
            continue
        for gen_type, tests in run.items():
            if gen_type not in generator_names:
                sys.stderr.write("Warning: generators.py does not have the '%s' generator, skipping tests\n" % gen_type)
                continue
            test_args = collections.namedtuple('testargs', ['parse_html', 'generator'])(parse_html, gen_type)
            archie = interfacer.Archiver(archiver, test_args)
            for mboxfile in mboxfiles:
                sys.stderr.write("Starting to process %s using %s\n" % (mboxfile,gen_type))
                mbox = mailbox.mbox(mboxfile, None if args.nomboxo else MboxoFactory, create=False)
                no_messages = len(mbox.keys())
                no_tests = len(tests)
                if no_messages != no_tests:
                    sys.stderr.write("Warning: %s run for %s contains %u tests, but mbox file has %u emails!\n" %
                                    (gen_type, mboxfile, no_tests, no_messages))
                for test in tests:
                    tests_run += 1
                    key = test['index']
                    message_raw = _raw(args, mbox, key)
                    message = mbox.get(key)
                    # Mock archived-at for slightly broken medium generators
                    if 'MOCK_AAT' in _env and gen_type == 'medium':
                        mock_aat = email.utils.formatdate(int(_env['MOCK_AAT']), False)
                        try:
                            message.replace_header('archived-at', mock_aat)
                        except:
                            message['archived-at'] = mock_aat
                    msgid =(message.get('message-id') or '').strip()
                    dateheader = message.get('date')
                    if args.skipnodate and not dateheader:
                        print("""[SKIP] %s, index %2u: No date header found and --skipnodate specified, skipping this test!""" %
                                         (gen_type, key, ))
                        skipped += 1
                        continue
                    if msgid != test['message-id']:
                        sys.stderr.write("""[SEQ?] %s, index %2u: Expected '%s', got '%s'!\n""" %
                                        (gen_type, key, test['message-id'], msgid))
                        continue # no point continuing
                    lid = args.lid or archiver.normalize_lid(message.get('list-id', '??'))
                    json = archie.compute_updates(fake_args, lid, False, message, message_raw)

                    expected = test['generated']
                    alternate = expected
                    if archie.version == '10' or archie.version == _env.get('ALT_VERSION'):
                        alternate = test.get('alternate') # alternate value for v0.10
                    actual = json['mid']
                    if actual != expected and actual != alternate:
                        errors += 1
                        sys.stderr.write("""[FAIL] %s, index %2u: Expected '%s', got '%s'!\n""" %
                                        (gen_type, key, expected, actual))
                        if args.dropin and gen_type == args.dropin:
                            if expected != actual:
                                test['generated'] = actual
                            else:
                                test['alternate'] = actual
                    else:
                        print("[PASS] %s index %u" % (gen_type, key))
        mboxfiles = [] # reset for the next set of tests
    if args.dropin and errors:
        sys.stderr.write("Writing replacement yaml as --dropin was specified\n")
        yaml.safe_dump(yml, open(args.load, "w"), sort_keys=False)
    # N.B. The following line is parsed by runall.py
    print("[DONE] %u tests run, %u failed. Skipped %u." % (tests_run, errors, skipped))
    if errors:
        sys.exit(-1)


def main():
    parser = argparse.ArgumentParser(description='Command line options.')
    parser.add_argument('--generate', dest='generate', type=str,
                        help='Generate a test yaml spec, output to file specified here')
    parser.add_argument('--generators', dest='generators', nargs='+', type=str,
                        help='Override the list of generator names')
    parser.add_argument('--load', dest='load', type=str,
                        help='Load and run tests from a yaml spec file')
    parser.add_argument('--mbox', dest='mboxfile', type=str,
                        help='If generating spec, which mbox corpus file to use for testing')
    parser.add_argument('--listid', dest='lid', type=str,
                        help='List-ID header override if needed')
    parser.add_argument('--rootdir', dest='rootdir', type=str, required=True,
                        help="Root directory of Apache Pony Mail")
    parser.add_argument('--nomboxo', dest = 'nomboxo', action='store_true',
                        help = 'Skip Mboxo processing')
    parser.add_argument('--dropin', dest = 'dropin', type=str,
                        help = 'Perform drop-in replacement of unit test results for the specified generator type [devs only!]')
    parser.add_argument('--skipnodate', dest = 'skipnodate', action='store_true',
                        help = 'Skip emails with no Date: header (useful for medium generator tests)')
    args = parser.parse_args()

    if args.rootdir:
        tools_dir = os.path.join(args.rootdir, 'tools')
    else:
        tools_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', "tools")
    sys.path.append(tools_dir)

    if os.environ.get('MOCK_GMTIME'):
        import time
        import traceback
        save_gmtime = time.gmtime
        def _time_gmtime(secs=None):
            if secs is None:
                callers = traceback.extract_stack(limit=2) # want last-1 and last (i.e. here)
                [filename, _, _, _] = callers[0] # This is last-1, i.e. my caller
                if filename.endswith("/tools/archiver.py") or filename.endswith("tools/generators.py"):
                    return save_gmtime(0)
            return save_gmtime(secs)

        time.gmtime = _time_gmtime
    
    if args.generate:
        if not args.mboxfile:
            sys.stderr.write("Generating a test spec requires an mbox filepath passed with --mbox!\n")
            sys.exit(-1)
        generate_specs(args)
    elif args.load:
        run_tests(args)


if __name__ == '__main__':
    main()
