blob: 712ae01636abedeb5e9c8cf356551c5177e341fd [file] [log] [blame]
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import contextlib
import os
import random
import subprocess
import sys
import tempfile
import zipfile
from collections import namedtuple
from textwrap import dedent
from .bin.pex import log, main
from .common import safe_mkdir, safe_rmtree
from .compatibility import nested
from .installer import EggInstaller, Packager
from .pex_builder import PEXBuilder
from .util import DistributionHelper, named_temporary_file
@contextlib.contextmanager
def temporary_dir():
td = tempfile.mkdtemp()
try:
yield td
finally:
safe_rmtree(td)
@contextlib.contextmanager
def temporary_filename():
"""Creates a temporary filename.
This is useful when you need to pass a filename to an API. Windows requires all
handles to a file be closed before deleting/renaming it, so this makes it a bit
simpler."""
with named_temporary_file() as fp:
fp.write(b'')
fp.close()
yield fp.name
def random_bytes(length):
return ''.join(
map(chr, (random.randint(ord('a'), ord('z')) for _ in range(length)))).encode('utf-8')
@contextlib.contextmanager
def temporary_content(content_map, interp=None, seed=31337):
"""Write content to disk where content is map from string => (int, string).
If target is int, write int random bytes. Otherwise write contents of string."""
random.seed(seed)
interp = interp or {}
with temporary_dir() as td:
for filename, size_or_content in content_map.items():
safe_mkdir(os.path.dirname(os.path.join(td, filename)))
with open(os.path.join(td, filename), 'wb') as fp:
if isinstance(size_or_content, int):
fp.write(random_bytes(size_or_content))
else:
fp.write((size_or_content % interp).encode('utf-8'))
yield td
def yield_files(directory):
for root, _, files in os.walk(directory):
for f in files:
filename = os.path.join(root, f)
rel_filename = os.path.relpath(filename, directory)
yield filename, rel_filename
def write_zipfile(directory, dest, reverse=False):
with contextlib.closing(zipfile.ZipFile(dest, 'w')) as zf:
for filename, rel_filename in sorted(yield_files(directory), reverse=reverse):
zf.write(filename, arcname=rel_filename)
return dest
PROJECT_CONTENT = {
'setup.py': dedent('''
from setuptools import setup
setup(
name=%(project_name)r,
version='0.0.0',
zip_safe=%(zip_safe)r,
packages=['my_package'],
scripts=[
'scripts/hello_world',
'scripts/shell_script',
],
package_data={'my_package': ['package_data/*.dat']},
install_requires=%(install_requires)r,
)
'''),
'scripts/hello_world': '#!/usr/bin/env python\nprint("hello world!")\n',
'scripts/shell_script': '#!/usr/bin/env bash\necho hello world\n',
'my_package/__init__.py': 0,
'my_package/my_module.py': 'def do_something():\n print("hello world!")\n',
'my_package/package_data/resource1.dat': 1000,
'my_package/package_data/resource2.dat': 1000,
}
@contextlib.contextmanager
def make_installer(name='my_project', installer_impl=EggInstaller, zip_safe=True,
install_reqs=None):
interp = {'project_name': name, 'zip_safe': zip_safe, 'install_requires': install_reqs or []}
with temporary_content(PROJECT_CONTENT, interp=interp) as td:
yield installer_impl(td)
@contextlib.contextmanager
def make_source_dir(name='my_project', install_reqs=None):
interp = {'project_name': name, 'zip_safe': True, 'install_requires': install_reqs or []}
with temporary_content(PROJECT_CONTENT, interp=interp) as td:
yield td
def make_sdist(name='my_project', zip_safe=True, install_reqs=None):
with make_installer(name=name, installer_impl=Packager, zip_safe=zip_safe,
install_reqs=install_reqs) as packager:
return packager.sdist()
@contextlib.contextmanager
def make_bdist(name='my_project', installer_impl=EggInstaller, zipped=False, zip_safe=True):
with make_installer(name=name, installer_impl=installer_impl, zip_safe=zip_safe) as installer:
dist_location = installer.bdist()
if zipped:
yield DistributionHelper.distribution_from_path(dist_location)
else:
with temporary_dir() as td:
extract_path = os.path.join(td, os.path.basename(dist_location))
with contextlib.closing(zipfile.ZipFile(dist_location)) as zf:
zf.extractall(extract_path)
yield DistributionHelper.distribution_from_path(extract_path)
COVERAGE_PREAMBLE = """
try:
from coverage import coverage
cov = coverage(auto_data=True, data_suffix=True)
cov.start()
except ImportError:
pass
"""
def write_simple_pex(td, exe_contents, dists=None, coverage=False):
"""Write a pex file that contains an executable entry point
:param td: temporary directory path
:param exe_contents: entry point python file
:type exe_contents: string
:param dists: distributions to include, typically sdists or bdists
:param coverage: include coverage header
"""
dists = dists or []
with open(os.path.join(td, 'exe.py'), 'w') as fp:
fp.write(exe_contents)
pb = PEXBuilder(path=td, preamble=COVERAGE_PREAMBLE if coverage else None)
for dist in dists:
pb.add_egg(dist.location)
pb.set_executable(os.path.join(td, 'exe.py'))
pb.freeze()
return pb
class IntegResults(namedtuple('results', 'output return_code exception')):
"""Convenience object to return integration run results."""
def assert_success(self):
assert self.exception is None and self.return_code is None
def assert_failure(self):
assert self.exception or self.return_code
def run_pex_command(args, env=None):
"""Simulate running pex command for integration testing.
This is different from run_simple_pex in that it calls the pex command rather
than running a generated pex. This is useful for testing end to end runs
with specific command line arguments or env options.
"""
def logger_callback(_output):
def mock_logger(msg, v=None):
_output.append(msg)
return mock_logger
exception = None
error_code = None
output = []
log.set_logger(logger_callback(output))
try:
main(args=args)
except SystemExit as e:
error_code = e.code
except Exception as e:
exception = e
return IntegResults(output, error_code, exception)
# TODO(wickman) Why not PEX.run?
def run_simple_pex(pex, args=(), env=None):
po = subprocess.Popen(
[sys.executable, pex] + list(args),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env)
po.wait()
return po.stdout.read().replace(b'\r', b''), po.returncode
def run_simple_pex_test(body, args=(), env=None, dists=None, coverage=False):
with nested(temporary_dir(), temporary_dir()) as (td1, td2):
pb = write_simple_pex(td1, body, dists=dists, coverage=coverage)
pex = os.path.join(td2, 'app.pex')
pb.build(pex)
return run_simple_pex(pex, args=args, env=env)
def _iter_filter(data_dict):
fragment = '/%s/_pex/' % PEXBuilder.BOOTSTRAP_DIR
for filename, records in data_dict.items():
try:
bi = filename.index(fragment)
except ValueError:
continue
# rewrite to look like root source
yield ('pex/' + filename[bi + len():], records)
def combine_pex_coverage(coverage_file_iter):
from coverage.data import CoverageData
combined = CoverageData(basename='.coverage_combined')
for filename in coverage_file_iter:
cov = CoverageData(basename=filename)
cov.read()
combined.add_line_data(dict(_iter_filter(cov.line_data())))
combined.add_arc_data(dict(_iter_filter(cov.arc_data())))
combined.write()
return combined.filename