| import py |
| import pytest |
| |
| def pytest_addoption(parser): |
| group = parser.getgroup("xdist", "distributed and subprocess testing") |
| group._addoption('-f', '--looponfail', |
| action="store_true", dest="looponfail", default=False, |
| help="run tests in subprocess, wait for modified files " |
| "and re-run failing test set until all pass.") |
| group._addoption('-n', dest="numprocesses", metavar="numprocesses", default=2, |
| action="store", type="int", |
| help="shortcut for '--dist=load --tx=NUM*popen'") |
| group.addoption('--boxed', |
| action="store_true", dest="boxed", default=False, |
| help="box each test run in a separate process (unix)") |
| group._addoption('--dist', metavar="distmode", |
| action="store", choices=['load', 'each', 'no'], |
| type="choice", dest="dist", default="no", |
| help=("set mode for distributing tests to exec environments.\n\n" |
| "each: send each test to each available environment.\n\n" |
| "load: send each test to available environment.\n\n" |
| "(default) no: run tests inprocess, don't distribute.")) |
| group._addoption('--tx', dest="tx", action="append", default=[], |
| metavar="xspec", |
| help=("add a test execution environment. some examples: " |
| "--tx popen//python=python2.5 --tx socket=192.168.1.102:8888 " |
| "--tx ssh=user@codespeak.net//chdir=testcache")) |
| group._addoption('-d', |
| action="store_true", dest="distload", default=False, |
| help="load-balance tests. shortcut for '--dist=load'") |
| group.addoption('--rsyncdir', action="append", default=[], metavar="DIR", |
| help="add directory for rsyncing to remote tx nodes.") |
| group.addoption('--rsyncignore', action="append", default=[], metavar="GLOB", |
| help="add expression for ignores when rsyncing to remote tx nodes.") |
| |
| parser.addini('rsyncdirs', 'list of (relative) paths to be rsynced for' |
| ' remote distributed testing.', type="pathlist") |
| parser.addini('rsyncignore', 'list of (relative) glob-style paths to be ignored ' |
| 'for rsyncing.', type="pathlist") |
| parser.addini("looponfailroots", type="pathlist", |
| help="directories to check for changes", default=[py.path.local()]) |
| |
| # ------------------------------------------------------------------------- |
| # distributed testing hooks |
| # ------------------------------------------------------------------------- |
| def pytest_addhooks(pluginmanager): |
| from xdist import newhooks |
| pluginmanager.addhooks(newhooks) |
| |
| # ------------------------------------------------------------------------- |
| # distributed testing initialization |
| # ------------------------------------------------------------------------- |
| |
| def pytest_cmdline_main(config): |
| check_options(config) |
| if config.getoption("looponfail"): |
| from xdist.looponfail import looponfail_main |
| looponfail_main(config) |
| return 2 # looponfail only can get stop with ctrl-C anyway |
| |
| def pytest_configure(config, __multicall__): |
| __multicall__.execute() |
| if config.getoption("dist") != "no": |
| from xdist.dsession import DSession |
| session = DSession(config) |
| config.pluginmanager.register(session, "dsession") |
| tr = config.pluginmanager.getplugin("terminalreporter") |
| tr.showfspath = False |
| |
| def check_options(config): |
| if config.option.numprocesses: |
| config.option.dist = "load" |
| config.option.tx = ['popen'] * int(config.option.numprocesses) |
| if config.option.distload: |
| config.option.dist = "load" |
| val = config.getvalue |
| if not val("collectonly"): |
| usepdb = config.option.usepdb # a core option |
| if val("looponfail"): |
| if usepdb: |
| raise pytest.UsageError("--pdb incompatible with --looponfail.") |
| elif val("dist") != "no": |
| if usepdb: |
| raise pytest.UsageError("--pdb incompatible with distributing tests.") |
| |
| |
| def pytest_runtest_protocol(item): |
| if item.config.getvalue("boxed"): |
| reports = forked_run_report(item) |
| for rep in reports: |
| item.ihook.pytest_runtest_logreport(report=rep) |
| return True |
| |
| def forked_run_report(item): |
| # for now, we run setup/teardown in the subprocess |
| # XXX optionally allow sharing of setup/teardown |
| from _pytest.runner import runtestprotocol |
| EXITSTATUS_TESTEXIT = 4 |
| import marshal |
| from xdist.remote import serialize_report |
| from xdist.slavemanage import unserialize_report |
| def runforked(): |
| try: |
| reports = runtestprotocol(item, log=False) |
| except KeyboardInterrupt: |
| py.std.os._exit(EXITSTATUS_TESTEXIT) |
| return marshal.dumps([serialize_report(x) for x in reports]) |
| |
| ff = py.process.ForkedFunc(runforked) |
| result = ff.waitfinish() |
| if result.retval is not None: |
| report_dumps = marshal.loads(result.retval) |
| return [unserialize_report("testreport", x) for x in report_dumps] |
| else: |
| if result.exitstatus == EXITSTATUS_TESTEXIT: |
| py.test.exit("forked test item %s raised Exit" %(item,)) |
| return [report_process_crash(item, result)] |
| |
| def report_process_crash(item, result): |
| path, lineno = item._getfslineno() |
| info = ("%s:%s: running the test CRASHED with signal %d" % |
| (path, lineno, result.signal)) |
| from _pytest import runner |
| call = runner.CallInfo(lambda: 0/0, "???") |
| call.excinfo = info |
| rep = runner.pytest_runtest_makereport(item, call) |
| if result.out: |
| rep.sections.append(("captured stdout", result.out)) |
| if result.err: |
| rep.sections.append(("captured stderr", result.err)) |
| return rep |