| import fnmatch |
| import os |
| |
| import py |
| import pytest |
| import execnet |
| import xdist.remote |
| |
| from _pytest import runner # XXX load dynamically |
| |
| class NodeManager(object): |
| EXIT_TIMEOUT = 10 |
| DEFAULT_IGNORES = ['.*', '*.pyc', '*.pyo', '*~'] |
| def __init__(self, config, specs=None, defaultchdir="pyexecnetcache"): |
| self.config = config |
| self._nodesready = py.std.threading.Event() |
| self.trace = self.config.trace.get("nodemanager") |
| self.group = execnet.Group() |
| if specs is None: |
| specs = self._getxspecs() |
| self.specs = [] |
| for spec in specs: |
| if not isinstance(spec, execnet.XSpec): |
| spec = execnet.XSpec(spec) |
| if not spec.chdir and not spec.popen: |
| spec.chdir = defaultchdir |
| self.group.allocate_id(spec) |
| self.specs.append(spec) |
| self.roots = self._getrsyncdirs() |
| self.rsyncoptions = self._getrsyncoptions() |
| |
| def rsync_roots(self): |
| """ make sure that all remote gateways |
| have the same set of roots in their |
| current directory. |
| """ |
| if self.roots: |
| # send each rsync root |
| for root in self.roots: |
| self.rsync(root, **self.rsyncoptions) |
| |
| def makegateways(self): |
| assert not list(self.group) |
| self.config.hook.pytest_xdist_setupnodes(config=self.config, |
| specs=self.specs) |
| for spec in self.specs: |
| gw = self.group.makegateway(spec) |
| self.config.hook.pytest_xdist_newgateway(gateway=gw) |
| |
| def setup_nodes(self, putevent): |
| self.makegateways() |
| self.rsync_roots() |
| self.trace("setting up nodes") |
| for gateway in self.group: |
| node = SlaveController(self, gateway, self.config, putevent) |
| gateway.node = node # to keep node alive |
| node.setup() |
| self.trace("started node %r" % node) |
| |
| def teardown_nodes(self): |
| self.group.terminate(self.EXIT_TIMEOUT) |
| |
| def _getxspecs(self): |
| xspeclist = [] |
| for xspec in self.config.getvalue("tx"): |
| i = xspec.find("*") |
| try: |
| num = int(xspec[:i]) |
| except ValueError: |
| xspeclist.append(xspec) |
| else: |
| xspeclist.extend([xspec[i+1:]] * num) |
| if not xspeclist: |
| raise pytest.UsageError( |
| "MISSING test execution (tx) nodes: please specify --tx") |
| return [execnet.XSpec(x) for x in xspeclist] |
| |
| def _getrsyncdirs(self): |
| for spec in self.specs: |
| if not spec.popen or spec.chdir: |
| break |
| else: |
| return [] |
| import pytest, _pytest |
| pytestpath = pytest.__file__.rstrip("co") |
| pytestdir = py.path.local(_pytest.__file__).dirpath() |
| config = self.config |
| candidates = [py._pydir,pytestpath,pytestdir] |
| candidates += config.option.rsyncdir |
| rsyncroots = config.getini("rsyncdirs") |
| if rsyncroots: |
| candidates.extend(rsyncroots) |
| roots = [] |
| for root in candidates: |
| root = py.path.local(root).realpath() |
| if not root.check(): |
| raise pytest.UsageError("rsyncdir doesn't exist: %r" %(root,)) |
| if root not in roots: |
| roots.append(root) |
| return roots |
| |
| def _getrsyncoptions(self): |
| """Get options to be passed for rsync.""" |
| ignores = list(self.DEFAULT_IGNORES) |
| ignores += self.config.option.rsyncignore |
| ignores += self.config.getini("rsyncignore") |
| |
| return { |
| 'ignores': ignores, |
| 'verbose': self.config.option.verbose, |
| } |
| |
| |
| def rsync(self, source, notify=None, verbose=False, ignores=None): |
| """ perform rsync to all remote hosts. |
| """ |
| rsync = HostRSync(source, verbose=verbose, ignores=ignores) |
| seen = py.builtin.set() |
| gateways = [] |
| for gateway in self.group: |
| spec = gateway.spec |
| if spec.popen and not spec.chdir: |
| # XXX this assumes that sources are python-packages |
| # and that adding the basedir does not hurt |
| gateway.remote_exec(""" |
| import sys ; sys.path.insert(0, %r) |
| """ % os.path.dirname(str(source))).waitclose() |
| continue |
| if spec not in seen: |
| def finished(): |
| if notify: |
| notify("rsyncrootready", spec, source) |
| rsync.add_target_host(gateway, finished=finished) |
| seen.add(spec) |
| gateways.append(gateway) |
| if seen: |
| self.config.hook.pytest_xdist_rsyncstart( |
| source=source, |
| gateways=gateways, |
| ) |
| rsync.send() |
| self.config.hook.pytest_xdist_rsyncfinish( |
| source=source, |
| gateways=gateways, |
| ) |
| |
| class HostRSync(execnet.RSync): |
| """ RSyncer that filters out common files |
| """ |
| def __init__(self, sourcedir, *args, **kwargs): |
| self._synced = {} |
| ignores= None |
| if 'ignores' in kwargs: |
| ignores = kwargs.pop('ignores') |
| self._ignores = ignores or [] |
| super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs) |
| |
| def filter(self, path): |
| path = py.path.local(path) |
| for x in self._ignores: |
| x = getattr(x, 'strpath', x) |
| if fnmatch.fnmatch(path.basename, x) or fnmatch.fnmatch(path.strpath, x): |
| return False |
| else: |
| return True |
| |
| def add_target_host(self, gateway, finished=None): |
| remotepath = os.path.basename(self._sourcedir) |
| super(HostRSync, self).add_target(gateway, remotepath, |
| finishedcallback=finished, |
| delete=True,) |
| |
| def _report_send_file(self, gateway, modified_rel_path): |
| if self._verbose: |
| path = os.path.basename(self._sourcedir) + "/" + modified_rel_path |
| remotepath = gateway.spec.chdir |
| py.builtin.print_('%s:%s <= %s' % |
| (gateway.spec, remotepath, path)) |
| |
| |
| def make_reltoroot(roots, args): |
| # XXX introduce/use public API for splitting py.test args |
| splitcode = "::" |
| l = [] |
| for arg in args: |
| parts = arg.split(splitcode) |
| fspath = py.path.local(parts[0]) |
| for root in roots: |
| x = fspath.relto(root) |
| if x or fspath == root: |
| parts[0] = root.basename + "/" + x |
| break |
| else: |
| raise ValueError("arg %s not relative to an rsync root" % (arg,)) |
| l.append(splitcode.join(parts)) |
| return l |
| |
| class SlaveController(object): |
| ENDMARK = -1 |
| |
| def __init__(self, nodemanager, gateway, config, putevent): |
| self.nodemanager = nodemanager |
| self.putevent = putevent |
| self.gateway = gateway |
| self.config = config |
| self.slaveinput = {'slaveid': gateway.id} |
| self._down = False |
| self.log = py.log.Producer("slavectl-%s" % gateway.id) |
| if not self.config.option.debug: |
| py.log.setconsumer(self.log._keywords, None) |
| |
| def __repr__(self): |
| return "<%s %s>" %(self.__class__.__name__, self.gateway.id,) |
| |
| def setup(self): |
| self.log("setting up slave session") |
| spec = self.gateway.spec |
| args = self.config.args |
| if not spec.popen or spec.chdir: |
| args = make_reltoroot(self.nodemanager.roots, args) |
| option_dict = vars(self.config.option) |
| if spec.popen: |
| name = "popen-%s" % self.gateway.id |
| basetemp = self.config._tmpdirhandler.getbasetemp() |
| option_dict['basetemp'] = str(basetemp.join(name)) |
| self.config.hook.pytest_configure_node(node=self) |
| self.channel = self.gateway.remote_exec(xdist.remote) |
| self.channel.send((self.slaveinput, args, option_dict)) |
| if self.putevent: |
| self.channel.setcallback(self.process_from_remote, |
| endmarker=self.ENDMARK) |
| |
| def ensure_teardown(self): |
| if hasattr(self, 'channel'): |
| if not self.channel.isclosed(): |
| self.log("closing", self.channel) |
| self.channel.close() |
| #del self.channel |
| if hasattr(self, 'gateway'): |
| self.log("exiting", self.gateway) |
| self.gateway.exit() |
| #del self.gateway |
| |
| def send_runtest_some(self, indices): |
| self.sendcommand("runtests", indices=indices) |
| |
| def send_runtest_all(self): |
| self.sendcommand("runtests_all",) |
| |
| def shutdown(self): |
| if not self._down: |
| try: |
| self.sendcommand("shutdown") |
| except IOError: |
| pass |
| |
| def sendcommand(self, name, **kwargs): |
| """ send a named parametrized command to the other side. """ |
| self.log("sending command %s(**%s)" % (name, kwargs)) |
| self.channel.send((name, kwargs)) |
| |
| def notify_inproc(self, eventname, **kwargs): |
| self.log("queuing %s(**%s)" % (eventname, kwargs)) |
| self.putevent((eventname, kwargs)) |
| |
| def process_from_remote(self, eventcall): |
| """ this gets called for each object we receive from |
| the other side and if the channel closes. |
| |
| Note that channel callbacks run in the receiver |
| thread of execnet gateways - we need to |
| avoid raising exceptions or doing heavy work. |
| """ |
| try: |
| if eventcall == self.ENDMARK: |
| err = self.channel._getremoteerror() |
| if not self._down: |
| if not err or isinstance(err, EOFError): |
| err = "Not properly terminated" # lost connection? |
| self.notify_inproc("errordown", node=self, error=err) |
| self._down = True |
| return |
| eventname, kwargs = eventcall |
| if eventname in ("collectionstart"): |
| self.log("ignoring %s(%s)" %(eventname, kwargs)) |
| elif eventname == "slaveready": |
| self.notify_inproc(eventname, node=self, **kwargs) |
| elif eventname == "slavefinished": |
| self._down = True |
| self.slaveoutput = kwargs['slaveoutput'] |
| self.notify_inproc("slavefinished", node=self) |
| elif eventname == "logstart": |
| self.notify_inproc(eventname, node=self, **kwargs) |
| elif eventname in ("testreport", "collectreport", "teardownreport"): |
| item_index = kwargs.pop("item_index", None) |
| rep = unserialize_report(eventname, kwargs['data']) |
| if item_index is not None: |
| rep.item_index = item_index |
| self.notify_inproc(eventname, node=self, rep=rep) |
| elif eventname == "collectionfinish": |
| self.notify_inproc(eventname, node=self, ids=kwargs['ids']) |
| else: |
| raise ValueError("unknown event: %s" %(eventname,)) |
| except KeyboardInterrupt: |
| # should not land in receiver-thread |
| raise |
| except: |
| excinfo = py.code.ExceptionInfo() |
| py.builtin.print_("!" * 20, excinfo) |
| self.config.pluginmanager.notify_exception(excinfo) |
| |
| def unserialize_report(name, reportdict): |
| if name == "testreport": |
| return runner.TestReport(**reportdict) |
| elif name == "collectreport": |
| return runner.CollectReport(**reportdict) |