| """ |
| Utility functions for |
| |
| - building and importing modules on test time, using a temporary location |
| - detecting if compilers are present |
| - determining paths to tests |
| |
| """ |
| import glob |
| import os |
| import sys |
| import subprocess |
| import tempfile |
| import shutil |
| import atexit |
| import textwrap |
| import re |
| import pytest |
| import contextlib |
| import numpy |
| |
| from pathlib import Path |
| from numpy.compat import asbytes, asstr |
| from numpy.testing import temppath, IS_WASM |
| from importlib import import_module |
| |
| # |
| # Maintaining a temporary module directory |
| # |
| |
| _module_dir = None |
| _module_num = 5403 |
| |
| if sys.platform == "cygwin": |
| NUMPY_INSTALL_ROOT = Path(__file__).parent.parent.parent |
| _module_list = list(NUMPY_INSTALL_ROOT.glob("**/*.dll")) |
| |
| |
| def _cleanup(): |
| global _module_dir |
| if _module_dir is not None: |
| try: |
| sys.path.remove(_module_dir) |
| except ValueError: |
| pass |
| try: |
| shutil.rmtree(_module_dir) |
| except OSError: |
| pass |
| _module_dir = None |
| |
| |
| def get_module_dir(): |
| global _module_dir |
| if _module_dir is None: |
| _module_dir = tempfile.mkdtemp() |
| atexit.register(_cleanup) |
| if _module_dir not in sys.path: |
| sys.path.insert(0, _module_dir) |
| return _module_dir |
| |
| |
| def get_temp_module_name(): |
| # Assume single-threaded, and the module dir usable only by this thread |
| global _module_num |
| get_module_dir() |
| name = "_test_ext_module_%d" % _module_num |
| _module_num += 1 |
| if name in sys.modules: |
| # this should not be possible, but check anyway |
| raise RuntimeError("Temporary module name already in use.") |
| return name |
| |
| |
| def _memoize(func): |
| memo = {} |
| |
| def wrapper(*a, **kw): |
| key = repr((a, kw)) |
| if key not in memo: |
| try: |
| memo[key] = func(*a, **kw) |
| except Exception as e: |
| memo[key] = e |
| raise |
| ret = memo[key] |
| if isinstance(ret, Exception): |
| raise ret |
| return ret |
| |
| wrapper.__name__ = func.__name__ |
| return wrapper |
| |
| |
| # |
| # Building modules |
| # |
| |
| |
| @_memoize |
| def build_module(source_files, options=[], skip=[], only=[], module_name=None): |
| """ |
| Compile and import a f2py module, built from the given files. |
| |
| """ |
| |
| code = f"import sys; sys.path = {sys.path!r}; import numpy.f2py; numpy.f2py.main()" |
| |
| d = get_module_dir() |
| |
| # Copy files |
| dst_sources = [] |
| f2py_sources = [] |
| for fn in source_files: |
| if not os.path.isfile(fn): |
| raise RuntimeError("%s is not a file" % fn) |
| dst = os.path.join(d, os.path.basename(fn)) |
| shutil.copyfile(fn, dst) |
| dst_sources.append(dst) |
| |
| base, ext = os.path.splitext(dst) |
| if ext in (".f90", ".f", ".c", ".pyf"): |
| f2py_sources.append(dst) |
| |
| assert f2py_sources |
| |
| # Prepare options |
| if module_name is None: |
| module_name = get_temp_module_name() |
| f2py_opts = ["-c", "-m", module_name] + options + f2py_sources |
| if skip: |
| f2py_opts += ["skip:"] + skip |
| if only: |
| f2py_opts += ["only:"] + only |
| |
| # Build |
| cwd = os.getcwd() |
| try: |
| os.chdir(d) |
| cmd = [sys.executable, "-c", code] + f2py_opts |
| p = subprocess.Popen(cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT) |
| out, err = p.communicate() |
| if p.returncode != 0: |
| raise RuntimeError("Running f2py failed: %s\n%s" % |
| (cmd[4:], asstr(out))) |
| finally: |
| os.chdir(cwd) |
| |
| # Partial cleanup |
| for fn in dst_sources: |
| os.unlink(fn) |
| |
| # Rebase (Cygwin-only) |
| if sys.platform == "cygwin": |
| # If someone starts deleting modules after import, this will |
| # need to change to record how big each module is, rather than |
| # relying on rebase being able to find that from the files. |
| _module_list.extend( |
| glob.glob(os.path.join(d, "{:s}*".format(module_name))) |
| ) |
| subprocess.check_call( |
| ["/usr/bin/rebase", "--database", "--oblivious", "--verbose"] |
| + _module_list |
| ) |
| |
| |
| |
| # Import |
| return import_module(module_name) |
| |
| |
| @_memoize |
| def build_code(source_code, |
| options=[], |
| skip=[], |
| only=[], |
| suffix=None, |
| module_name=None): |
| """ |
| Compile and import Fortran code using f2py. |
| |
| """ |
| if suffix is None: |
| suffix = ".f" |
| with temppath(suffix=suffix) as path: |
| with open(path, "w") as f: |
| f.write(source_code) |
| return build_module([path], |
| options=options, |
| skip=skip, |
| only=only, |
| module_name=module_name) |
| |
| |
| # |
| # Check if compilers are available at all... |
| # |
| |
| _compiler_status = None |
| |
| |
| def _get_compiler_status(): |
| global _compiler_status |
| if _compiler_status is not None: |
| return _compiler_status |
| |
| _compiler_status = (False, False, False) |
| if IS_WASM: |
| # Can't run compiler from inside WASM. |
| return _compiler_status |
| |
| # XXX: this is really ugly. But I don't know how to invoke Distutils |
| # in a safer way... |
| code = textwrap.dedent(f"""\ |
| import os |
| import sys |
| sys.path = {repr(sys.path)} |
| |
| def configuration(parent_name='',top_path=None): |
| global config |
| from numpy.distutils.misc_util import Configuration |
| config = Configuration('', parent_name, top_path) |
| return config |
| |
| from numpy.distutils.core import setup |
| setup(configuration=configuration) |
| |
| config_cmd = config.get_config_cmd() |
| have_c = config_cmd.try_compile('void foo() {{}}') |
| print('COMPILERS:%%d,%%d,%%d' %% (have_c, |
| config.have_f77c(), |
| config.have_f90c())) |
| sys.exit(99) |
| """) |
| code = code % dict(syspath=repr(sys.path)) |
| |
| tmpdir = tempfile.mkdtemp() |
| try: |
| script = os.path.join(tmpdir, "setup.py") |
| |
| with open(script, "w") as f: |
| f.write(code) |
| |
| cmd = [sys.executable, "setup.py", "config"] |
| p = subprocess.Popen(cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| cwd=tmpdir) |
| out, err = p.communicate() |
| finally: |
| shutil.rmtree(tmpdir) |
| |
| m = re.search(br"COMPILERS:(\d+),(\d+),(\d+)", out) |
| if m: |
| _compiler_status = ( |
| bool(int(m.group(1))), |
| bool(int(m.group(2))), |
| bool(int(m.group(3))), |
| ) |
| # Finished |
| return _compiler_status |
| |
| |
| def has_c_compiler(): |
| return _get_compiler_status()[0] |
| |
| |
| def has_f77_compiler(): |
| return _get_compiler_status()[1] |
| |
| |
| def has_f90_compiler(): |
| return _get_compiler_status()[2] |
| |
| |
| # |
| # Building with distutils |
| # |
| |
| |
| @_memoize |
| def build_module_distutils(source_files, config_code, module_name, **kw): |
| """ |
| Build a module via distutils and import it. |
| |
| """ |
| d = get_module_dir() |
| |
| # Copy files |
| dst_sources = [] |
| for fn in source_files: |
| if not os.path.isfile(fn): |
| raise RuntimeError("%s is not a file" % fn) |
| dst = os.path.join(d, os.path.basename(fn)) |
| shutil.copyfile(fn, dst) |
| dst_sources.append(dst) |
| |
| # Build script |
| config_code = textwrap.dedent(config_code).replace("\n", "\n ") |
| |
| code = fr""" |
| import os |
| import sys |
| sys.path = {repr(sys.path)} |
| |
| def configuration(parent_name='',top_path=None): |
| from numpy.distutils.misc_util import Configuration |
| config = Configuration('', parent_name, top_path) |
| {config_code} |
| return config |
| |
| if __name__ == "__main__": |
| from numpy.distutils.core import setup |
| setup(configuration=configuration) |
| """ |
| script = os.path.join(d, get_temp_module_name() + ".py") |
| dst_sources.append(script) |
| with open(script, "wb") as f: |
| f.write(asbytes(code)) |
| |
| # Build |
| cwd = os.getcwd() |
| try: |
| os.chdir(d) |
| cmd = [sys.executable, script, "build_ext", "-i"] |
| p = subprocess.Popen(cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT) |
| out, err = p.communicate() |
| if p.returncode != 0: |
| raise RuntimeError("Running distutils build failed: %s\n%s" % |
| (cmd[4:], asstr(out))) |
| finally: |
| os.chdir(cwd) |
| |
| # Partial cleanup |
| for fn in dst_sources: |
| os.unlink(fn) |
| |
| # Import |
| __import__(module_name) |
| return sys.modules[module_name] |
| |
| |
| # |
| # Unittest convenience |
| # |
| |
| |
| class F2PyTest: |
| code = None |
| sources = None |
| options = [] |
| skip = [] |
| only = [] |
| suffix = ".f" |
| module = None |
| |
| @property |
| def module_name(self): |
| cls = type(self) |
| return f'_{cls.__module__.rsplit(".",1)[-1]}_{cls.__name__}_ext_module' |
| |
| def setup_method(self): |
| if sys.platform == "win32": |
| pytest.skip("Fails with MinGW64 Gfortran (Issue #9673)") |
| |
| if self.module is not None: |
| return |
| |
| # Check compiler availability first |
| if not has_c_compiler(): |
| pytest.skip("No C compiler available") |
| |
| codes = [] |
| if self.sources: |
| codes.extend(self.sources) |
| if self.code is not None: |
| codes.append(self.suffix) |
| |
| needs_f77 = False |
| needs_f90 = False |
| needs_pyf = False |
| for fn in codes: |
| if str(fn).endswith(".f"): |
| needs_f77 = True |
| elif str(fn).endswith(".f90"): |
| needs_f90 = True |
| elif str(fn).endswith(".pyf"): |
| needs_pyf = True |
| if needs_f77 and not has_f77_compiler(): |
| pytest.skip("No Fortran 77 compiler available") |
| if needs_f90 and not has_f90_compiler(): |
| pytest.skip("No Fortran 90 compiler available") |
| if needs_pyf and not (has_f90_compiler() or has_f77_compiler()): |
| pytest.skip("No Fortran compiler available") |
| |
| # Build the module |
| if self.code is not None: |
| self.module = build_code( |
| self.code, |
| options=self.options, |
| skip=self.skip, |
| only=self.only, |
| suffix=self.suffix, |
| module_name=self.module_name, |
| ) |
| |
| if self.sources is not None: |
| self.module = build_module( |
| self.sources, |
| options=self.options, |
| skip=self.skip, |
| only=self.only, |
| module_name=self.module_name, |
| ) |
| |
| |
| # |
| # Helper functions |
| # |
| |
| |
| def getpath(*a): |
| # Package root |
| d = Path(numpy.f2py.__file__).parent.resolve() |
| return d.joinpath(*a) |
| |
| |
| @contextlib.contextmanager |
| def switchdir(path): |
| curpath = Path.cwd() |
| os.chdir(path) |
| try: |
| yield |
| finally: |
| os.chdir(curpath) |