blob: 27fb236b3b7d5d1c5011c6564b363bfa0c58e50f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from functools import wraps
from types import MethodType
from ...exceptions import AriaException
from ...utils.collections import FrozenDict, FrozenList, deepcopy_with_locators, merge, OrderedDict
from ...utils.caching import cachedmethod
from ...utils.console import puts
from ...utils.formatting import as_raw, safe_repr
from ...utils.type import full_type_name
from ...utils.exceptions import print_exception
from ..exceptions import InvalidValueError
from .null import NULL
from .utils import validate_primitive
#
# Class decorators
#
# pylint: disable=unused-argument
def has_fields(cls):
"""
Class decorator for validated field support.
1. Adds a ``FIELDS`` class property that is a dict of all the fields. Will inherit and merge
``FIELDS`` properties from base classes if they have them.
2. Generates automatic ``@property`` implementations for the fields with the help of a set of
special function decorators.
The class also works with the Python dict protocol, so that fields can be accessed via dict
semantics. The functionality is identical to that of using attribute access.
The class will also gain two utility methods, ``_iter_field_names`` and ``_iter_fields``.
"""
# Make sure we have FIELDS
if 'FIELDS' not in cls.__dict__:
setattr(cls, 'FIELDS', OrderedDict())
# Inherit FIELDS from base classes
for base in cls.__bases__:
if hasattr(base, 'FIELDS'):
cls.FIELDS.update(base.FIELDS)
# We could do this:
#
# for name, field in cls.__dict__.iteritems():
#
# But dir() is better because it has a deterministic order (alphabetical)
for name in dir(cls):
field = getattr(cls, name)
if isinstance(field, Field):
# Accumulate
cls.FIELDS[name] = field
field.name = name
field.container_cls = cls
# This function is here just to create an enclosed scope for "field"
def closure(field):
# By convention, we have the getter wrap the original function.
# (It is, for example, where the Python help() function will look for
# docstrings when encountering a property.)
@cachedmethod
@wraps(field.func)
def getter(self):
return field.get(self, None)
def setter(self, value):
field.set(self, None, value)
# Convert to Python property
return property(fget=getter, fset=setter)
setattr(cls, name, closure(field))
# Bind methods
setattr(cls, '_iter_field_names', MethodType(has_fields_iter_field_names, None, cls))
setattr(cls, '_iter_fields', MethodType(has_fields_iter_fields, None, cls))
# Behave like a dict
setattr(cls, '__len__', MethodType(has_fields_len, None, cls))
setattr(cls, '__getitem__', MethodType(has_fields_getitem, None, cls))
setattr(cls, '__setitem__', MethodType(has_fields_setitem, None, cls))
setattr(cls, '__delitem__', MethodType(has_fields_delitem, None, cls))
setattr(cls, '__iter__', MethodType(has_fields_iter, None, cls))
setattr(cls, '__contains__', MethodType(has_fields_contains, None, cls))
return cls
def short_form_field(name):
"""
Class decorator for specifying the short form field.
The class must be decorated with :func:`has_fields`.
"""
def decorator(cls):
if hasattr(cls, name) and hasattr(cls, 'FIELDS') and (name in cls.FIELDS):
setattr(cls, 'SHORT_FORM_FIELD', name)
return cls
else:
raise AttributeError('@short_form_field must be used with '
'a Field name in @has_fields class')
return decorator
def allow_unknown_fields(cls):
"""
Class decorator specifying that the class allows unknown fields.
The class must be decorated with :func:`has_fields`.
"""
if hasattr(cls, 'FIELDS'):
setattr(cls, 'ALLOW_UNKNOWN_FIELDS', True)
return cls
else:
raise AttributeError('@allow_unknown_fields must be used with a @has_fields class')
#
# Method decorators
#
def primitive_field(cls=None, default=None, allowed=None, required=False):
"""
Method decorator for primitive fields.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='primitive', func=func, cls=cls, default=default,
allowed=allowed, required=required)
return decorator
def primitive_list_field(cls=None, default=None, allowed=None, required=False):
"""
Method decorator for list of primitive fields.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='primitive_list', func=func, cls=cls, default=default,
allowed=allowed, required=required)
return decorator
def primitive_dict_field(cls=None, default=None, allowed=None, required=False):
"""
Method decorator for dict of primitive fields.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='primitive_dict', func=func, cls=cls, default=default,
allowed=allowed, required=required)
return decorator
def primitive_dict_unknown_fields(cls=None, default=None, allowed=None, required=False):
"""
Method decorator for dict of primitive fields, for all the fields that are
not already decorated.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='primitive_dict_unknown_fields', func=func, cls=cls,
default=default, allowed=allowed, required=required)
return decorator
def object_field(cls, default=None, allowed=None, required=False):
"""
Method decorator for object fields.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='object', func=func, cls=cls, default=default, allowed=allowed,
required=required)
return decorator
def object_list_field(cls, default=None, allowed=None, required=False):
"""
Method decorator for list of object fields.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='object_list', func=func, cls=cls, default=default,
allowed=allowed, required=required)
return decorator
def object_dict_field(cls, default=None, allowed=None, required=False):
"""
Method decorator for dict of object fields.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='object_dict', func=func, cls=cls, default=default,
allowed=allowed, required=required)
return decorator
def object_sequenced_list_field(cls, default=None, allowed=None, required=False):
"""
Method decorator for sequenced list of object fields.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='sequenced_object_list', func=func, cls=cls, default=default,
allowed=allowed, required=required)
return decorator
def object_dict_unknown_fields(cls, default=None, allowed=None, required=False):
"""
Method decorator for dict of object fields, for all the fields that are not already decorated.
The function must be a method in a class decorated with :func:`has_fields`.
"""
def decorator(func):
return Field(field_variant='object_dict_unknown_fields', func=func, cls=cls,
default=default, allowed=allowed, required=required)
return decorator
def field_getter(getter_func):
"""
Method decorator for overriding the getter function of a field.
The signature of the getter function must be: ``f(field, presentation, context)``.
The default getter can be accessed as ```field.default_get(presentation, context)``.
The function must already be decorated with a field decorator.
"""
def decorator(field):
if isinstance(field, Field):
field.get = MethodType(getter_func, field, Field)
return field
else:
raise AttributeError('@field_getter must be used with a Field')
return decorator
def field_setter(setter_func):
"""
Method decorator for overriding the setter function of a field.
The signature of the setter function must be: ``f(field, presentation, context, value)``.
The default setter can be accessed as ``field.default_set(presentation, context, value)``.
The function must already be decorated with a field decorator.
"""
def decorator(field):
if isinstance(field, Field):
field.set = MethodType(setter_func, field, Field)
return field
else:
raise AttributeError('@field_setter must be used with a Field')
return decorator
def field_validator(validator_fn):
"""
Method decorator for overriding the validator function of a field.
The signature of the validator function must be: ``f(field, presentation, context)``.
The default validator can be accessed as ``field.default_validate(presentation, context)``.
The function must already be decorated with a field decorator.
"""
def decorator(field):
if isinstance(field, Field):
field.validate = MethodType(validator_fn, field, Field)
return field
else:
raise AttributeError('@field_validator must be used with a Field')
return decorator
#
# Utils
#
def has_fields_iter_field_names(self):
for name in self.__class__.FIELDS:
yield name
def has_fields_iter_fields(self):
return self.FIELDS.iteritems()
def has_fields_len(self):
return len(self.__class__.FIELDS)
def has_fields_getitem(self, key):
if not isinstance(key, basestring):
raise TypeError('key must be a string')
if key not in self.__class__.FIELDS:
raise KeyError('no \'{0}\' property'.format(key))
return getattr(self, key)
def has_fields_setitem(self, key, value):
if not isinstance(key, basestring):
raise TypeError('key must be a string')
if key not in self.__class__.FIELDS:
raise KeyError('no \'{0}\' property'.format(key))
return setattr(self, key, value)
def has_fields_delitem(self, key):
if not isinstance(key, basestring):
raise TypeError('key must be a string')
if key not in self.__class__.FIELDS:
raise KeyError('no \'{0}\' property'.format(key))
return setattr(self, key, None)
def has_fields_iter(self):
return self.__class__.FIELDS.iterkeys()
def has_fields_contains(self, key):
if not isinstance(key, basestring):
raise TypeError('key must be a string')
return key in self.__class__.FIELDS
class Field(object):
"""
Field handler used by ``@has_fields`` decorator.
"""
def __init__(self, field_variant, func, cls=None, default=None, allowed=None, required=False):
if cls == str:
# Use "unicode" instead of "str"
cls = unicode
self.container_cls = None
self.name = None
self.field_variant = field_variant
self.func = func
self.cls = cls
self.default = default
self.allowed = allowed
self.required = required
@property
def full_name(self):
return u'field "{0}" in "{1}"'.format(self.name, full_type_name(self.container_cls))
@property
def full_cls_name(self):
name = full_type_name(self.cls)
if name == 'unicode':
# For simplicity, display "unicode" as "str"
name = 'str'
return name
def get(self, presentation, context):
return self.default_get(presentation, context)
def set(self, presentation, context, value):
return self.default_set(presentation, context, value)
def validate(self, presentation, context):
self.default_validate(presentation, context)
def get_locator(self, raw):
if hasattr(raw, '_locator'):
locator = raw._locator
if locator is not None:
return locator.get_child(self.name)
return None
def dump(self, presentation, context):
value = getattr(presentation, self.name)
if value is None:
return
dumper = getattr(self, '_dump_{0}'.format(self.field_variant))
dumper(context, value)
def default_get(self, presentation, context):
# Handle raw
default_raw = (presentation._get_default_raw()
if hasattr(presentation, '_get_default_raw')
else None)
if default_raw is None:
raw = presentation._raw
else:
# Handle default raw value
raw = deepcopy_with_locators(default_raw)
merge(raw, presentation._raw)
# Handle unknown fields (only dict can have unknown fields, lists can't have them)
if self.field_variant == 'primitive_dict_unknown_fields':
return self._get_primitive_dict_unknown_fields(presentation, raw, context)
elif self.field_variant == 'object_dict_unknown_fields':
return self._get_object_dict_unknown_fields(presentation, raw, context)
is_short_form_field = (self.container_cls.SHORT_FORM_FIELD == self.name
if hasattr(self.container_cls, 'SHORT_FORM_FIELD')
else False)
is_dict = isinstance(raw, dict)
# Find value
value = self._find_value(is_short_form_field, is_dict, raw)
# Handle required
if value is None:
if self.required:
raise InvalidValueError(u'required {0} does not have a value'
.format(self.full_name, locator=self.get_locator(raw)))
else:
return None
# Handle allowed values
if self.allowed is not None:
if value not in self.allowed:
raise InvalidValueError(u'{0} is not {1}'
.format(self.full_name,
u' or '.join([safe_repr(v) for v in self.allowed])),
locator=self.get_locator(raw))
# Handle get according to variant
getter = getattr(self, '_get_{0}'.format(self.field_variant), None)
if getter is None:
locator = self.get_locator(raw)
location = (u' @{0}'.format(locator)) if locator is not None else ''
raise AttributeError(u'{0} has unsupported field variant: "{1}"{2}'
.format(self.full_name, self.field_variant, location))
return getter(presentation, raw, value, context)
def _find_value(self, is_short_form_field, is_dict, raw):
value = None
if is_short_form_field and not is_dict:
# Handle short form
value = raw
if value is None:
# An explicit null
value = NULL
elif is_dict:
if self.name in raw:
value = raw[self.name]
if value is None:
# An explicit null
value = NULL
else:
value = self.default
return value
def default_set(self, presentation, context, value):
raw = presentation._raw
old = self.get(presentation, context)
raw[self.name] = value
try:
self.validate(presentation, context)
except Exception as e:
raw[self.name] = old
raise e
return old
def default_validate(self, presentation, context):
value = None
try:
value = self.get(presentation, context)
except AriaException as e:
if e.issue:
context.validation.report(issue=e.issue)
except Exception as e:
context.validation.report(exception=e)
print_exception(e)
self.validate_value(value, context)
def validate_value(self, value, context):
if isinstance(value, list):
if self.field_variant == 'object_list':
for element in value:
if hasattr(element, '_validate'):
element._validate(context)
elif self.field_variant == 'sequenced_object_list':
for _, element in value:
if hasattr(element, '_validate'):
element._validate(context)
elif isinstance(value, dict):
if self.field_variant in ('object_dict', 'object_dict_unknown_fields'):
for inner_value in value.itervalues():
if hasattr(inner_value, '_validate'):
inner_value._validate(context)
if hasattr(value, '_validate'):
value._validate(context)
@staticmethod
def _get_context():
thread_locals = threading.local()
return getattr(thread_locals, 'aria_consumption_context', None)
def _coerce_primitive(self, value, context):
if context is None:
context = Field._get_context()
allow_primitive_coercion = (context.validation.allow_primitive_coersion
if context is not None
else True)
return validate_primitive(value, self.cls, allow_primitive_coercion)
# primitive
def _get_primitive(self, presentation, raw, value, context):
if (self.cls is not None) and (not isinstance(value, self.cls)) \
and (value is not None):
try:
return self._coerce_primitive(value, context)
except ValueError as e:
raise InvalidValueError(u'{0} is not a valid "{1}": {2}'
.format(self.full_name, self.full_cls_name,
safe_repr(value)),
locator=self.get_locator(raw), cause=e)
return value
def _dump_primitive(self, context, value):
if hasattr(value, 'as_raw'):
value = as_raw(value)
puts(u'{0}: {1}'.format(self.name, context.style.literal_style(value)))
# primitive list
def _get_primitive_list(self, presentation, raw, value, context):
if not isinstance(value, list):
raise InvalidValueError(u'{0} is not a list: {1}'
.format(self.full_name, safe_repr(value)),
locator=self.get_locator(raw))
primitive_list = value
if self.cls is not None:
if context is None:
context = Field._get_context()
primitive_list = []
for i, _ in enumerate(value):
primitive = value[i]
if primitive is None:
primitive = NULL
try:
primitive = self._coerce_primitive(primitive, context)
except ValueError as e:
raise InvalidValueError(u'{0} is not a list of "{1}": element {2:d} is {3}'
.format(self.full_name,
self.full_cls_name,
i,
safe_repr(primitive)),
locator=self.get_locator(raw), cause=e)
if primitive in primitive_list:
raise InvalidValueError(u'{0} has a duplicate "{1}": {2}'
.format(self.full_name,
self.full_cls_name,
safe_repr(primitive)),
locator=self.get_locator(raw))
primitive_list.append(primitive)
return FrozenList(primitive_list)
def _dump_primitive_list(self, context, value):
puts(u'{0}:'.format(self.name))
with context.style.indent():
for primitive in value:
if hasattr(primitive, 'as_raw'):
primitive = as_raw(primitive)
puts(context.style.literal_style(primitive))
# primitive dict
def _get_primitive_dict(self, presentation, raw, value, context):
if not isinstance(value, dict):
raise InvalidValueError(u'{0} is not a dict: {1}'
.format(self.full_name, safe_repr(value)),
locator=self.get_locator(raw))
primitive_dict = value
if self.cls is not None:
if context is None:
context = Field._get_context()
primitive_dict = OrderedDict()
for k, v in value.iteritems():
if v is None:
v = NULL
try:
primitive_dict[k] = self._coerce_primitive(v, context)
except ValueError as e:
raise InvalidValueError(u'{0} is not a dict of "{1}" values: entry "{2:d}" '
u'is {3}'
.format(self.full_name, self.full_cls_name, k,
safe_repr(v)),
locator=self.get_locator(raw),
cause=e)
return FrozenDict(primitive_dict)
def _dump_primitive_dict(self, context, value):
puts(u'{0}:'.format(self.name))
with context.style.indent():
for v in value.itervalues():
if hasattr(v, 'as_raw'):
v = as_raw(v)
puts(context.style.literal_style(v))
# object
def _get_object(self, presentation, raw, value, context):
try:
return self.cls(name=self.name, raw=value, container=presentation)
except TypeError as e:
raise InvalidValueError(u'{0} cannot not be initialized to an instance of "{1}": {2}'
.format(self.full_name, self.full_cls_name, safe_repr(value)),
cause=e,
locator=self.get_locator(raw))
def _dump_object(self, context, value):
puts(u'{0}:'.format(self.name))
with context.style.indent():
if hasattr(value, '_dump'):
value._dump(context)
# object list
def _get_object_list(self, presentation, raw, value, context):
if not isinstance(value, list):
raise InvalidValueError(u'{0} is not a list: {1}'
.format(self.full_name, safe_repr(value)),
locator=self.get_locator(raw))
return FrozenList((self.cls(name=self.name, raw=v, container=presentation) for v in value))
def _dump_object_list(self, context, value):
puts(u'{0}:'.format(self.name))
with context.style.indent():
for v in value:
if hasattr(v, '_dump'):
v._dump(context)
# object dict
def _get_object_dict(self, presentation, raw, value, context):
if not isinstance(value, dict):
raise InvalidValueError(u'{0} is not a dict: {1}'
.format(self.full_name, safe_repr(value)),
locator=self.get_locator(raw))
return FrozenDict(((k, self.cls(name=k, raw=v, container=presentation))
for k, v in value.iteritems()))
def _dump_object_dict(self, context, value):
puts(u'{0}:'.format(self.name))
with context.style.indent():
for v in value.itervalues():
if hasattr(v, '_dump'):
v._dump(context)
# sequenced object list
def _get_sequenced_object_list(self, presentation, raw, value, context):
if not isinstance(value, list):
raise InvalidValueError(u'{0} is not a sequenced list (a list of dicts, '
u'each with exactly one key): {1}'
.format(self.full_name, safe_repr(value)),
locator=self.get_locator(raw))
sequence = []
for v in value:
if not isinstance(v, dict):
raise InvalidValueError(u'{0} list elements are not all dicts with '
u'exactly one key: {1}'
.format(self.full_name, safe_repr(value)),
locator=self.get_locator(raw))
if len(v) != 1:
raise InvalidValueError(u'{0} list elements do not all have exactly one key: {1}'
.format(self.full_name, safe_repr(value)),
locator=self.get_locator(raw))
key, value = v.items()[0]
sequence.append((key, self.cls(name=key, raw=value, container=presentation)))
return FrozenList(sequence)
def _dump_sequenced_object_list(self, context, value):
puts(u'{0}:'.format(self.name))
for _, v in value:
if hasattr(v, '_dump'):
v._dump(context)
# primitive dict for unknown fields
def _get_primitive_dict_unknown_fields(self, presentation, raw, context):
if isinstance(raw, dict):
primitive_dict = raw
if self.cls is not None:
if context is None:
context = Field._get_context()
primitive_dict = OrderedDict()
for k, v in raw.iteritems():
if k not in presentation.FIELDS:
if v is None:
v = NULL
try:
primitive_dict[k] = self._coerce_primitive(v, context)
except ValueError as e:
raise InvalidValueError(u'{0} is not a dict of "{1}" values:'
u' entry "{2}" is {3}'
.format(self.full_name, self.full_cls_name,
k, safe_repr(v)),
locator=self.get_locator(raw),
cause=e)
return FrozenDict(primitive_dict)
return None
def _dump_primitive_dict_unknown_fields(self, context, value):
self._dump_primitive_dict(context, value)
# object dict for unknown fields
def _get_object_dict_unknown_fields(self, presentation, raw, context):
if isinstance(raw, dict):
return FrozenDict(((k, self.cls(name=k, raw=v, container=presentation))
for k, v in raw.iteritems() if k not in presentation.FIELDS))
return None
def _dump_object_dict_unknown_fields(self, context, value):
self._dump_object_dict(context, value)