blob: b8e9201087eed5e0fffdd9a8c4df8813d880a87f [file] [log] [blame]
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import absolute_import, print_function
import itertools
import os
import site
import sys
import uuid
from pkg_resources import (
DistributionNotFound,
Environment,
Requirement,
WorkingSet,
find_distributions
)
from .common import die, open_zip, safe_mkdir, safe_rmtree
from .interpreter import PythonInterpreter
from .package import distribution_compatible
from .pex_builder import PEXBuilder
from .pex_info import PexInfo
from .tracer import TRACER
from .util import CacheHelper, DistributionHelper
class PEXEnvironment(Environment):
@classmethod
def force_local(cls, pex, pex_info):
if pex_info.code_hash is None:
# Do not support force_local if code_hash is not set. (It should always be set.)
return pex
explode_dir = os.path.join(pex_info.zip_unsafe_cache, pex_info.code_hash)
TRACER.log('PEX is not zip safe, exploding to %s' % explode_dir)
if not os.path.exists(explode_dir):
explode_tmp = explode_dir + '.' + uuid.uuid4().hex
with TRACER.timed('Unzipping %s' % pex):
try:
safe_mkdir(explode_tmp)
with open_zip(pex) as pex_zip:
pex_files = (x for x in pex_zip.namelist()
if not x.startswith(PEXBuilder.BOOTSTRAP_DIR) and
not x.startswith(PexInfo.INTERNAL_CACHE))
pex_zip.extractall(explode_tmp, pex_files)
except: # noqa: T803
safe_rmtree(explode_tmp)
raise
TRACER.log('Renaming %s to %s' % (explode_tmp, explode_dir))
os.rename(explode_tmp, explode_dir)
return explode_dir
@classmethod
def update_module_paths(cls, new_code_path):
# Force subsequent imports to come from the .pex directory rather than the .pex file.
TRACER.log('Adding to the head of sys.path: %s' % new_code_path)
sys.path.insert(0, new_code_path)
for name, module in sys.modules.items():
if hasattr(module, "__path__"):
module_dir = os.path.join(new_code_path, *name.split("."))
TRACER.log('Adding to the head of %s.__path__: %s' % (module.__name__, module_dir))
module.__path__.insert(0, module_dir)
@classmethod
def write_zipped_internal_cache(cls, pex, pex_info):
prefix_length = len(pex_info.internal_cache) + 1
existing_cached_distributions = []
newly_cached_distributions = []
zip_safe_distributions = []
with open_zip(pex) as zf:
# Distribution names are the first element after ".deps/" and before the next "/"
distribution_names = set(filter(None, (filename[prefix_length:].split('/')[0]
for filename in zf.namelist() if filename.startswith(pex_info.internal_cache))))
# Create Distribution objects from these, and possibly write to disk if necessary.
for distribution_name in distribution_names:
internal_dist_path = '/'.join([pex_info.internal_cache, distribution_name])
# First check if this is already cached
dist_digest = pex_info.distributions.get(distribution_name) or CacheHelper.zip_hash(
zf, internal_dist_path)
cached_location = os.path.join(pex_info.install_cache, '%s.%s' % (
distribution_name, dist_digest))
if os.path.exists(cached_location):
dist = DistributionHelper.distribution_from_path(cached_location)
if dist is not None:
existing_cached_distributions.append(dist)
continue
else:
dist = DistributionHelper.distribution_from_path(os.path.join(pex, internal_dist_path))
if dist is not None:
if DistributionHelper.zipsafe(dist) and not pex_info.always_write_cache:
zip_safe_distributions.append(dist)
continue
with TRACER.timed('Caching %s' % dist):
newly_cached_distributions.append(
CacheHelper.cache_distribution(zf, internal_dist_path, cached_location))
return existing_cached_distributions, newly_cached_distributions, zip_safe_distributions
@classmethod
def load_internal_cache(cls, pex, pex_info):
"""Possibly cache out the internal cache."""
internal_cache = os.path.join(pex, pex_info.internal_cache)
with TRACER.timed('Searching dependency cache: %s' % internal_cache, V=2):
if os.path.isdir(pex):
for dist in find_distributions(internal_cache):
yield dist
else:
for dist in itertools.chain(*cls.write_zipped_internal_cache(pex, pex_info)):
yield dist
def __init__(self, pex, pex_info, interpreter=None, **kw):
self._internal_cache = os.path.join(pex, pex_info.internal_cache)
self._pex = pex
self._pex_info = pex_info
self._activated = False
self._working_set = None
self._interpreter = interpreter or PythonInterpreter.get()
super(PEXEnvironment, self).__init__(
search_path=sys.path if pex_info.inherit_path else [], **kw)
def update_candidate_distributions(self, distribution_iter):
for dist in distribution_iter:
if self.can_add(dist):
with TRACER.timed('Adding %s' % dist, V=2):
self.add(dist)
def can_add(self, dist):
return distribution_compatible(dist, self._interpreter, self.platform)
def activate(self):
if not self._activated:
with TRACER.timed('Activating PEX virtual environment from %s' % self._pex):
self._working_set = self._activate()
self._activated = True
return self._working_set
def _resolve(self, working_set, reqs):
reqs = reqs[:]
unresolved_reqs = set()
resolveds = set()
# Resolve them one at a time so that we can figure out which ones we need to elide should
# there be an interpreter incompatibility.
for req in reqs:
with TRACER.timed('Resolving %s' % req, V=2):
try:
resolveds.update(working_set.resolve([req], env=self))
except DistributionNotFound as e:
TRACER.log('Failed to resolve a requirement: %s' % e)
unresolved_reqs.add(e.args[0].project_name)
# Older versions of pkg_resources just call `DistributionNotFound(req)` instead of the
# modern `DistributionNotFound(req, requirers)` and so we may not have the 2nd requirers
# slot at all.
if len(e.args) >= 2 and e.args[1]:
unresolved_reqs.update(e.args[1])
unresolved_reqs = set([req.lower() for req in unresolved_reqs])
if unresolved_reqs:
TRACER.log('Unresolved requirements:')
for req in unresolved_reqs:
TRACER.log(' - %s' % req)
TRACER.log('Distributions contained within this pex:')
if not self._pex_info.distributions:
TRACER.log(' None')
else:
for dist in self._pex_info.distributions:
TRACER.log(' - %s' % dist)
if not self._pex_info.ignore_errors:
die('Failed to execute PEX file, missing compatible dependencies for:\n%s' % (
'\n'.join(map(str, unresolved_reqs))))
return resolveds
def _activate(self):
self.update_candidate_distributions(self.load_internal_cache(self._pex, self._pex_info))
if not self._pex_info.zip_safe and os.path.isfile(self._pex):
self.update_module_paths(self.force_local(self._pex, self._pex_info))
all_reqs = [Requirement.parse(req) for req in self._pex_info.requirements]
working_set = WorkingSet([])
resolved = self._resolve(working_set, all_reqs)
for dist in resolved:
with TRACER.timed('Activating %s' % dist, V=2):
working_set.add(dist)
if os.path.isdir(dist.location):
with TRACER.timed('Adding sitedir', V=2):
site.addsitedir(dist.location)
dist.activate()
return working_set