blob: 11a98bbac77b9abd25910ea9d21d22db4596c38c [file] [log] [blame]
#!/usr/bin/env python3
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module which provides compatibility with older Python versions."""
__all__ = ["PY3", "int", "long", "xrange", "exec_", "callable", "namedtuple",
"property", "wraps", "defaultdict", "update_wrapper", "lru_cache"]
import sys
try:
import __builtin__
except ImportError:
import builtins as __builtin__ # py3
PY3 = sys.version_info[0] == 3
if PY3:
int = int
long = int
xrange = range
unicode = str
exec_ = getattr(__builtin__, "exec")
print_ = getattr(__builtin__, "print")
def u(s):
return s
def b(s):
return s.encode("latin-1")
else:
int = int
long = long
xrange = xrange
unicode = unicode
def u(s):
return unicode(s, "unicode_escape")
def b(s):
return s
def exec_(code, globs=None, locs=None):
if globs is None:
frame = _sys._getframe(1)
globs = frame.f_globals
if locs is None:
locs = frame.f_locals
del frame
elif locs is None:
locs = globs
exec("""exec code in globs, locs""")
def print_(s):
sys.stdout.write(s + '\n')
sys.stdout.flush()
# removed in 3.0, reintroduced in 3.2
try:
callable = callable
except NameError:
def callable(obj):
return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)
# --- stdlib additions
# py 2.6 collections.namedtuple
# Taken from: http://code.activestate.com/recipes/500261/
# Credits: Raymond Hettinger
try:
from collections import namedtuple
except ImportError:
from operator import itemgetter as _itemgetter
from keyword import iskeyword as _iskeyword
import sys as _sys
def namedtuple(typename, field_names, verbose=False, rename=False):
"""A collections.namedtuple implementation, see:
http://docs.python.org/library/collections.html#namedtuple
"""
if isinstance(field_names, basestring):
field_names = field_names.replace(',', ' ').split()
field_names = tuple(map(str, field_names))
if rename:
names = list(field_names)
seen = set()
for i, name in enumerate(names):
if ((not min(c.isalnum() or c == '_' for c in name)
or _iskeyword(name)
or not name or name[0].isdigit()
or name.startswith('_')
or name in seen)):
names[i] = '_%d' % i
seen.add(name)
field_names = tuple(names)
for name in (typename,) + field_names:
if not min(c.isalnum() or c == '_' for c in name):
raise ValueError('Type names and field names can only contain '
'alphanumeric characters and underscores: %r'
% name)
if _iskeyword(name):
raise ValueError('Type names and field names cannot be a '
'keyword: %r' % name)
if name[0].isdigit():
raise ValueError('Type names and field names cannot start '
'with a number: %r' % name)
seen_names = set()
for name in field_names:
if name.startswith('_') and not rename:
raise ValueError(
'Field names cannot start with an underscore: %r' % name)
if name in seen_names:
raise ValueError('Encountered duplicate field name: %r' % name)
seen_names.add(name)
numfields = len(field_names)
argtxt = repr(field_names).replace("'", "")[1:-1]
reprtxt = ', '.join('%s=%%r' % name for name in field_names)
template = '''class %(typename)s(tuple):
'%(typename)s(%(argtxt)s)' \n
__slots__ = () \n
_fields = %(field_names)r \n
def __new__(_cls, %(argtxt)s):
return _tuple.__new__(_cls, (%(argtxt)s)) \n
@classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new %(typename)s object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != %(numfields)d:
raise TypeError(
'Expected %(numfields)d arguments, got %%d' %% len(result))
return result \n
def __repr__(self):
return '%(typename)s(%(reprtxt)s)' %% self \n
def _asdict(self):
'Return a new dict which maps field names to their values'
return dict(zip(self._fields, self)) \n
def _replace(_self, **kwds):
result = _self._make(map(kwds.pop, %(field_names)r, _self))
if kwds:
raise ValueError(
'Got unexpected field names: %%r' %% kwds.keys())
return result \n
def __getnewargs__(self):
return tuple(self) \n\n''' % locals()
for i, name in enumerate(field_names):
template += ' %s = _property(_itemgetter(%d))\n' % (name, i)
if verbose:
sys.stdout.write(template + '\n')
sys.stdout.flush()
namespace = dict(
_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename,
_property=property, _tuple=tuple)
try:
exec_(template, namespace)
except SyntaxError:
e = sys.exc_info()[1]
raise SyntaxError(e.message + ':\n' + template)
result = namespace[typename]
try:
result.__module__ = _sys._getframe(
1).f_globals.get('__name__', '__main__')
except (AttributeError, ValueError):
pass
return result
# hack to support property getter/setter/deleter on python < 2.6
# http://docs.python.org/library/functions.html?highlight=property#property
if hasattr(property, 'setter'):
property = property
else:
class property(__builtin__.property):
__metaclass__ = type
def __init__(self, fget, *args, **kwargs):
super(property, self).__init__(fget, *args, **kwargs)
self.__doc__ = fget.__doc__
def getter(self, method):
return property(method, self.fset, self.fdel)
def setter(self, method):
return property(self.fget, method, self.fdel)
def deleter(self, method):
return property(self.fget, self.fset, method)
# py 2.5 collections.defauldict
# Taken from:
# http://code.activestate.com/recipes/523034-emulate-collectionsdefaultdict/
# Credits: Jason Kirtland
try:
from collections import defaultdict
except ImportError:
class defaultdict(dict):
"""Dict subclass that calls a factory function to supply
missing values:
http://docs.python.org/library/collections.html#collections.defaultdict
"""
def __init__(self, default_factory=None, *a, **kw):
if ((default_factory is not None and
not hasattr(default_factory, '__call__'))):
raise TypeError('first argument must be callable')
dict.__init__(self, *a, **kw)
self.default_factory = default_factory
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
return self.__missing__(key)
def __missing__(self, key):
if self.default_factory is None:
raise KeyError(key)
self[key] = value = self.default_factory()
return value
def __reduce__(self):
if self.default_factory is None:
args = tuple()
else:
args = self.default_factory,
return type(self), args, None, None, self.items()
def copy(self):
return self.__copy__()
def __copy__(self):
return type(self)(self.default_factory, self)
def __deepcopy__(self, memo):
import copy
return type(self)(self.default_factory,
copy.deepcopy(self.items()))
def __repr__(self):
return 'defaultdict(%s, %s)' % (self.default_factory,
dict.__repr__(self))
# py 2.5 functools.wraps
try:
from functools import wraps
except ImportError:
def wraps(original):
def inner(fn):
for attribute in ['__module__', '__name__', '__doc__']:
setattr(fn, attribute, getattr(original, attribute))
for attribute in ['__dict__']:
if hasattr(fn, attribute):
getattr(fn, attribute).update(getattr(original, attribute))
else:
setattr(fn, attribute,
getattr(original, attribute).copy())
return fn
return inner
# py 2.5 functools.update_wrapper
try:
from functools import update_wrapper
except ImportError:
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__')
WRAPPER_UPDATES = ('__dict__',)
def update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
updated=WRAPPER_UPDATES):
"""Update a wrapper function to look like the wrapped function, see:
http://docs.python.org/library/functools.html#functools.update_wrapper
"""
for attr in assigned:
setattr(wrapper, attr, getattr(wrapped, attr))
for attr in updated:
getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
return wrapper
# py 3.2 functools.lru_cache
# Taken from: http://code.activestate.com/recipes/578078
# Credit: Raymond Hettinger
try:
from functools import lru_cache
except ImportError:
try:
from threading import RLock
except ImportError:
from dummy_threading import RLock
_CacheInfo = namedtuple("CacheInfo",
["hits", "misses", "maxsize", "currsize"])
class _HashedSeq(list):
__slots__ = 'hashvalue'
def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)
def __hash__(self):
return self.hashvalue
def _make_key(args, kwds, typed,
kwd_mark=(object(), ),
fasttypes=set((int, str, frozenset, type(None))),
sorted=sorted, tuple=tuple, type=type, len=len):
key = args
if kwds:
sorted_items = sorted(kwds.items())
key += kwd_mark
for item in sorted_items:
key += item
if typed:
key += tuple(type(v) for v in args)
if kwds:
key += tuple(type(v) for k, v in sorted_items)
elif len(key) == 1 and type(key[0]) in fasttypes:
return key[0]
return _HashedSeq(key)
def lru_cache(maxsize=100, typed=False):
"""Least-recently-used cache decorator, see:
http://docs.python.org/3/library/functools.html#functools.lru_cache
"""
def decorating_function(user_function):
cache = dict()
stats = [0, 0]
HITS, MISSES = 0, 1
make_key = _make_key
cache_get = cache.get
_len = len
lock = RLock()
root = []
root[:] = [root, root, None, None]
nonlocal_root = [root]
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
if maxsize == 0:
def wrapper(*args, **kwds):
result = user_function(*args, **kwds)
stats[MISSES] += 1
return result
elif maxsize is None:
def wrapper(*args, **kwds):
key = make_key(args, kwds, typed)
result = cache_get(key, root)
if result is not root:
stats[HITS] += 1
return result
result = user_function(*args, **kwds)
cache[key] = result
stats[MISSES] += 1
return result
else:
def wrapper(*args, **kwds):
if kwds or typed:
key = make_key(args, kwds, typed)
else:
key = args
lock.acquire()
try:
link = cache_get(key)
if link is not None:
root, = nonlocal_root
link_prev, link_next, key, result = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
stats[HITS] += 1
return result
finally:
lock.release()
result = user_function(*args, **kwds)
lock.acquire()
try:
root, = nonlocal_root
if key in cache:
pass
elif _len(cache) >= maxsize:
oldroot = root
oldroot[KEY] = key
oldroot[RESULT] = result
root = nonlocal_root[0] = oldroot[NEXT]
oldkey = root[KEY]
root[KEY] = root[RESULT] = None
del cache[oldkey]
cache[key] = oldroot
else:
last = root[PREV]
link = [last, root, key, result]
last[NEXT] = root[PREV] = cache[key] = link
stats[MISSES] += 1
finally:
lock.release()
return result
def cache_info():
"""Report cache statistics"""
lock.acquire()
try:
return _CacheInfo(stats[HITS], stats[MISSES], maxsize,
len(cache))
finally:
lock.release()
def cache_clear():
"""Clear the cache and cache statistics"""
lock.acquire()
try:
cache.clear()
root = nonlocal_root[0]
root[:] = [root, root, None, None]
stats[:] = [0, 0]
finally:
lock.release()
wrapper.__wrapped__ = user_function
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return update_wrapper(wrapper, user_function)
return decorating_function