blob: 24718c0ebf2b01b8f0d30811c6230f5f03ce9470 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Contains the Schema classes.
A schema may be one of:
A record, mapping field names to field value data;
An error, equivalent to a record;
An enum, containing one of a small set of symbols;
An array of values, all of the same schema;
A map containing string/value pairs, each of a declared schema;
A union of other schemas;
A fixed sized binary object;
A unicode string;
A sequence of bytes;
A 32-bit signed int;
A 64-bit signed long;
A 32-bit floating-point float;
A 64-bit floating-point double;
A boolean; or
Null.
"""
try:
import json
except ImportError:
import simplejson as json
#
# Constants
#
PRIMITIVE_TYPES = (
'null',
'boolean',
'string',
'bytes',
'int',
'long',
'float',
'double',
)
NAMED_TYPES = (
'fixed',
'enum',
'record',
'error',
)
VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + (
'array',
'map',
'union',
'request',
'error_union'
)
RESERVED_PROPS = (
'type',
'name',
'namespace',
'fields', # Record
'items', # Array
'size', # Fixed
'symbols', # Enum
'values', # Map
)
VALID_FIELD_SORT_ORDERS = (
'ascending',
'descending',
'ignore',
)
#
# Exceptions
#
class AvroException(Exception):
pass
class SchemaParseException(AvroException):
pass
#
# Base Classes
#
class Schema(object):
"""Base class for all Schema classes."""
def __init__(self, type):
# Ensure valid ctor args
if not isinstance(type, basestring):
fail_msg = 'Schema type must be a string.'
raise SchemaParseException(fail_msg)
elif type not in VALID_TYPES:
fail_msg = '%s is not a valid type.' % type
raise SchemaParseException(fail_msg)
# add members
if not hasattr(self, '_props'): self._props = {}
self.set_prop('type', type)
# Read-only properties dict. Printing schemas
# creates JSON properties directly from this dict.
props = property(lambda self: self._props)
type = property(lambda self: self.get_prop('type'))
# utility functions to manipulate properties dict
def get_prop(self, key):
return self.props.get(key)
def set_prop(self, key, value):
self.props[key] = value
def __str__(self):
names = Names()
return json.dumps(self.to_json(names))
def to_json(self, names):
"""
Converts the schema object into its AVRO specification representation.
Schema types that have names (records, enums, and fixed) must
be aware of not re-defining schemas that are already listed
in the parameter names.
"""
raise Exception("Must be implemented by subclasses.")
class Name(object):
"""Class to describe Avro name."""
def __init__(self, name_attr, space_attr, default_space):
"""
Formulate full name according to the specification.
@arg name_attr: name value read in schema or None.
@arg space_attr: namespace value read in schema or None.
@ard default_space: the current default space or None.
"""
# Ensure valid ctor args
if not (isinstance(name_attr, basestring) or (name_attr is None)):
fail_msg = 'Name must be non-empty string or None.'
raise SchemaParseException(fail_msg)
elif name_attr == "":
fail_msg = 'Name must be non-empty string or None.'
raise SchemaParseException(fail_msg)
if not (isinstance(space_attr, basestring) or (space_attr is None)):
fail_msg = 'Space must be non-empty string or None.'
raise SchemaParseException(fail_msg)
elif name_attr == "":
fail_msg = 'Space must be non-empty string or None.'
raise SchemaParseException(fail_msg)
if not (isinstance(default_space, basestring) or (default_space is None)):
fail_msg = 'Default space must be non-empty string or None.'
raise SchemaParseException(fail_msg)
elif name_attr == "":
fail_msg = 'Default must be non-empty string or None.'
raise SchemaParseException(fail_msg)
self._full = None;
if name_attr is None or name_attr == "":
return;
if (name_attr.find('.') < 0):
if (space_attr is not None) and (space_attr != ""):
self._full = "%s.%s" % (space_attr, name_attr)
else:
if (default_space is not None) and (default_space != ""):
self._full = "%s.%s" % (default_space, name_attr)
else:
self._full = name_attr
else:
self._full = name_attr
def __eq__(self, other):
if not isinstance(other, Name):
return False
return (self.fullname == other.fullname)
fullname = property(lambda self: self._full)
def get_space(self):
"""Back out a namespace from full name."""
if self._full is None:
return None
if (self._full.find('.') > 0):
return self._full.rsplit(".", 1)[0]
else:
return ""
class Names(object):
"""Track name set and default namespace during parsing."""
def __init__(self, default_namespace=None):
self.names = {}
self.default_namespace = default_namespace
def has_name(self, name_attr, space_attr):
test = Name(name_attr, space_attr, self.default_namespace).fullname
return self.names.has_key(test)
def get_name(self, name_attr, space_attr):
test = Name(name_attr, space_attr, self.default_namespace).fullname
if not self.names.has_key(test):
return None
return self.names[test]
def add_name(self, name_attr, space_attr, new_schema):
"""
Add a new schema object to the name set.
@arg name_attr: name value read in schema
@arg space_attr: namespace value read in schema.
@return: the Name that was just added.
"""
to_add = Name(name_attr, space_attr, self.default_namespace)
if to_add.fullname in VALID_TYPES:
fail_msg = '%s is a reserved type name.' % to_add.fullname
raise SchemaParseException(fail_msg)
elif self.names.has_key(to_add.fullname):
fail_msg = 'The name "%s" is already in use.' % to_add.fullname
raise SchemaParseException(fail_msg)
self.names[to_add.fullname] = new_schema
return to_add
class NamedSchema(Schema):
"""Named Schemas specified in NAMED_TYPES."""
def __init__(self, type, name, namespace=None, names=None):
# Ensure valid ctor args
if not name:
fail_msg = 'Named Schemas must have a non-empty name.'
raise SchemaParseException(fail_msg)
elif not isinstance(name, basestring):
fail_msg = 'The name property must be a string.'
raise SchemaParseException(fail_msg)
elif namespace is not None and not isinstance(namespace, basestring):
fail_msg = 'The namespace property must be a string.'
raise SchemaParseException(fail_msg)
# Call parent ctor
Schema.__init__(self, type)
# Add class members
new_name = names.add_name(name, namespace, self)
# Store name and namespace as they were read in origin schema
self.set_prop('name', name)
if namespace is not None:
self.set_prop('namespace', new_name.get_space())
# Store full name as calculated from name, namespace
self._fullname = new_name.fullname
def name_ref(self, names):
if self.namespace == names.default_namespace:
return self.name
else:
return self.fullname
# read-only properties
name = property(lambda self: self.get_prop('name'))
namespace = property(lambda self: self.get_prop('namespace'))
fullname = property(lambda self: self._fullname)
class Field(object):
def __init__(self, type, name, has_default, default=None, order=None, names=None):
# Ensure valid ctor args
if not name:
fail_msg = 'Fields must have a non-empty name.'
raise SchemaParseException(fail_msg)
elif not isinstance(name, basestring):
fail_msg = 'The name property must be a string.'
raise SchemaParseException(fail_msg)
elif order is not None and order not in VALID_FIELD_SORT_ORDERS:
fail_msg = 'The order property %s is not valid.' % order
raise SchemaParseException(fail_msg)
# add members
self._props = {}
self._has_default = has_default
if (isinstance(type, basestring) and names is not None
and names.has_name(type, None)):
type_schema = names.get_name(type, None)
else:
try:
type_schema = make_avsc_object(type, names)
except Exception, e:
fail_msg = 'Type property "%s" not a valid Avro schema: %s' % (type, e)
raise SchemaParseException(fail_msg)
self.set_prop('type', type_schema)
self.set_prop('name', name)
# TODO(hammer): check to ensure default is valid
if has_default: self.set_prop('default', default)
if order is not None: self.set_prop('order', order)
# read-only properties
type = property(lambda self: self.get_prop('type'))
name = property(lambda self: self.get_prop('name'))
default = property(lambda self: self.get_prop('default'))
has_default = property(lambda self: self._has_default)
order = property(lambda self: self.get_prop('order'))
props = property(lambda self: self._props)
# utility functions to manipulate properties dict
def get_prop(self, key):
return self.props.get(key)
def set_prop(self, key, value):
self.props[key] = value
def to_json(self, names):
to_dump = self.props.copy()
to_dump['type'] = self.type.to_json(names)
return to_dump
def __eq__(self, that):
to_cmp = json.loads(str(self))
return to_cmp == json.loads(str(that))
#
# Primitive Types
#
class PrimitiveSchema(Schema):
"""Valid primitive types are in PRIMITIVE_TYPES."""
def __init__(self, type):
# Ensure valid ctor args
if type not in PRIMITIVE_TYPES:
raise AvroException("%s is not a valid primitive type." % type)
# Call parent ctor
Schema.__init__(self, type)
self.fullname = type
def to_json(self, names):
if len(self.props) == 1:
return self.fullname
else:
return self.props
def __eq__(self, that):
return self.props == that.props
#
# Complex Types (non-recursive)
#
class FixedSchema(NamedSchema):
def __init__(self, name, namespace, size, names=None):
# Ensure valid ctor args
if not isinstance(size, int):
fail_msg = 'Fixed Schema requires a valid integer for size property.'
raise AvroException(fail_msg)
# Call parent ctor
NamedSchema.__init__(self, 'fixed', name, namespace, names)
# Add class members
self.set_prop('size', size)
# read-only properties
size = property(lambda self: self.get_prop('size'))
def to_json(self, names):
if self.fullname in names.names:
return self.name_ref(names)
else:
names.names[self.fullname] = self
return self.props
def __eq__(self, that):
return self.props == that.props
class EnumSchema(NamedSchema):
def __init__(self, name, namespace, symbols, names=None):
# Ensure valid ctor args
if not isinstance(symbols, list):
fail_msg = 'Enum Schema requires a JSON array for the symbols property.'
raise AvroException(fail_msg)
elif False in [isinstance(s, basestring) for s in symbols]:
fail_msg = 'Enum Schems requires All symbols to be JSON strings.'
raise AvroException(fail_msg)
elif len(set(symbols)) < len(symbols):
fail_msg = 'Duplicate symbol: %s' % symbols
raise AvroException(fail_msg)
# Call parent ctor
NamedSchema.__init__(self, 'enum', name, namespace, names)
# Add class members
self.set_prop('symbols', symbols)
# read-only properties
symbols = property(lambda self: self.get_prop('symbols'))
def to_json(self, names):
if self.fullname in names.names:
return self.name_ref(names)
else:
names.names[self.fullname] = self
return self.props
def __eq__(self, that):
return self.props == that.props
#
# Complex Types (recursive)
#
class ArraySchema(Schema):
def __init__(self, items, names=None):
# Call parent ctor
Schema.__init__(self, 'array')
# Add class members
if isinstance(items, basestring) and names.has_name(items, None):
items_schema = names.get_name(items, None)
else:
try:
items_schema = make_avsc_object(items, names)
except SchemaParseException, e:
fail_msg = 'Items schema (%s) not a valid Avro schema: %s (known names: %s)' % (items, e, names.names.keys())
raise SchemaParseException(fail_msg)
self.set_prop('items', items_schema)
# read-only properties
items = property(lambda self: self.get_prop('items'))
def to_json(self, names):
to_dump = self.props.copy()
item_schema = self.get_prop('items')
to_dump['items'] = item_schema.to_json(names)
return to_dump
def __eq__(self, that):
to_cmp = json.loads(str(self))
return to_cmp == json.loads(str(that))
class MapSchema(Schema):
def __init__(self, values, names=None):
# Call parent ctor
Schema.__init__(self, 'map')
# Add class members
if isinstance(values, basestring) and names.has_name(values, None):
values_schema = names.get_name(values, None)
else:
try:
values_schema = make_avsc_object(values, names)
except:
fail_msg = 'Values schema not a valid Avro schema.'
raise SchemaParseException(fail_msg)
self.set_prop('values', values_schema)
# read-only properties
values = property(lambda self: self.get_prop('values'))
def to_json(self, names):
to_dump = self.props.copy()
to_dump['values'] = self.get_prop('values').to_json(names)
return to_dump
def __eq__(self, that):
to_cmp = json.loads(str(self))
return to_cmp == json.loads(str(that))
class UnionSchema(Schema):
"""
names is a dictionary of schema objects
"""
def __init__(self, schemas, names=None):
# Ensure valid ctor args
if not isinstance(schemas, list):
fail_msg = 'Union schema requires a list of schemas.'
raise SchemaParseException(fail_msg)
# Call parent ctor
Schema.__init__(self, 'union')
# Add class members
schema_objects = []
for schema in schemas:
if isinstance(schema, basestring) and names.has_name(schema, None):
new_schema = names.get_name(schema, None)
else:
try:
new_schema = make_avsc_object(schema, names)
except Exception, e:
raise SchemaParseException('Union item must be a valid Avro schema: %s' % str(e))
# check the new schema
if (new_schema.type in VALID_TYPES and new_schema.type not in NAMED_TYPES
and new_schema.type in [schema.type for schema in schema_objects]):
raise SchemaParseException('%s type already in Union' % new_schema.type)
elif new_schema.type == 'union':
raise SchemaParseException('Unions cannot contain other unions.')
else:
schema_objects.append(new_schema)
self._schemas = schema_objects
# read-only properties
schemas = property(lambda self: self._schemas)
def to_json(self, names):
to_dump = []
for schema in self.schemas:
to_dump.append(schema.to_json(names))
return to_dump
def __eq__(self, that):
to_cmp = json.loads(str(self))
return to_cmp == json.loads(str(that))
class ErrorUnionSchema(UnionSchema):
def __init__(self, schemas, names=None):
# Prepend "string" to handle system errors
UnionSchema.__init__(self, ['string'] + schemas, names)
def to_json(self, names):
to_dump = []
for schema in self.schemas:
# Don't print the system error schema
if schema.type == 'string': continue
to_dump.append(schema.to_json(names))
return to_dump
class RecordSchema(NamedSchema):
@staticmethod
def make_field_objects(field_data, names):
"""We're going to need to make message parameters too."""
field_objects = []
field_names = []
for i, field in enumerate(field_data):
if hasattr(field, 'get') and callable(field.get):
type = field.get('type')
name = field.get('name')
# null values can have a default value of None
has_default = False
default = None
if field.has_key('default'):
has_default = True
default = field.get('default')
order = field.get('order')
new_field = Field(type, name, has_default, default, order, names)
# make sure field name has not been used yet
if new_field.name in field_names:
fail_msg = 'Field name %s already in use.' % new_field.name
raise SchemaParseException(fail_msg)
field_names.append(new_field.name)
else:
raise SchemaParseException('Not a valid field: %s' % field)
field_objects.append(new_field)
return field_objects
def __init__(self, name, namespace, fields, names=None, schema_type='record'):
# Ensure valid ctor args
if fields is None:
fail_msg = 'Record schema requires a non-empty fields property.'
raise SchemaParseException(fail_msg)
elif not isinstance(fields, list):
fail_msg = 'Fields property must be a list of Avro schemas.'
raise SchemaParseException(fail_msg)
# Call parent ctor (adds own name to namespace, too)
if schema_type == 'request':
Schema.__init__(self, schema_type)
else:
NamedSchema.__init__(self, schema_type, name, namespace, names)
if schema_type == 'record':
old_default = names.default_namespace
names.default_namespace = Name(name, namespace,
names.default_namespace).get_space()
# Add class members
field_objects = RecordSchema.make_field_objects(fields, names)
self.set_prop('fields', field_objects)
if schema_type == 'record':
names.default_namespace = old_default
# read-only properties
fields = property(lambda self: self.get_prop('fields'))
@property
def fields_dict(self):
fields_dict = {}
for field in self.fields:
fields_dict[field.name] = field
return fields_dict
def to_json(self, names):
# Request records don't have names
if self.type == 'request':
return [ f.to_json(names) for f in self.fields ]
if self.fullname in names.names:
return self.name_ref(names)
else:
names.names[self.fullname] = self
to_dump = self.props.copy()
to_dump['fields'] = [ f.to_json(names) for f in self.fields ]
return to_dump
def __eq__(self, that):
to_cmp = json.loads(str(self))
return to_cmp == json.loads(str(that))
#
# Module Methods
#
# TODO(hammer): handle non-reserved properties
def make_avsc_object(json_data, names=None):
"""
Build Avro Schema from data parsed out of JSON string.
@arg names: A Name object (tracks seen names and default space)
"""
if names == None:
names = Names()
# JSON object (non-union)
if hasattr(json_data, 'get') and callable(json_data.get):
type = json_data.get('type')
if type in PRIMITIVE_TYPES:
return PrimitiveSchema(type)
elif type in NAMED_TYPES:
name = json_data.get('name')
namespace = json_data.get('namespace')
if type == 'fixed':
size = json_data.get('size')
return FixedSchema(name, namespace, size, names)
elif type == 'enum':
symbols = json_data.get('symbols')
return EnumSchema(name, namespace, symbols, names)
elif type in ['record', 'error']:
fields = json_data.get('fields')
return RecordSchema(name, namespace, fields, names, type)
else:
raise SchemaParseException('Unknown Named Type: %s' % type)
elif type in VALID_TYPES:
if type == 'array':
items = json_data.get('items')
return ArraySchema(items, names)
elif type == 'map':
values = json_data.get('values')
return MapSchema(values, names)
elif type == 'error_union':
declared_errors = json_data.get('declared_errors')
return ErrorUnionSchema(declared_errors, names)
else:
raise SchemaParseException('Unknown Valid Type: %s' % type)
elif type is None:
raise SchemaParseException('No "type" property: %s' % json_data)
else:
raise SchemaParseException('Undefined type: %s' % type)
# JSON array (union)
elif isinstance(json_data, list):
return UnionSchema(json_data, names)
# JSON string (primitive)
elif json_data in PRIMITIVE_TYPES:
return PrimitiveSchema(json_data)
# not for us!
else:
fail_msg = "Could not make an Avro Schema object from %s." % json_data
raise SchemaParseException(fail_msg)
# TODO(hammer): make method for reading from a file?
def parse(json_string):
"""Constructs the Schema from the JSON text."""
# TODO(hammer): preserve stack trace from JSON parse
# parse the JSON
try:
json_data = json.loads(json_string)
except:
raise SchemaParseException('Error parsing JSON: %s' % json_string)
# Initialize the names object
names = Names()
# construct the Avro Schema object
return make_avsc_object(json_data, names)